diff --git a/CLAUDE.md b/CLAUDE.md index e62c858..33b9a3c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,6 +2,23 @@ This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +# CRITICAL RULES + +- Scan the existing codebase and reuse existing functions wherever possible. +- Keep all imports within functions unless they must be mocked in a test. +- If an import is small, performative, and significantly reduces needs for new code, use the library. +- Write short Sphinx docstrings as a single line description, a single line for each parameter, and no empty lines. +- On first line of docstrings use \n instead of line break. +- Variable names must be `snake_case` sequence of descriptive words <=5 letters long +- Keep labels consistent across the entire project. +- In commit messages: use `+` for code adds, `-` for code subtractions, `~` for refactors/fixes. +- Write full variable names at all times. No abbreviations. +- Use descriptive variable names instead of comments. +- No inline comments. +- No emoji. +- No global variables. +- No semantic commit messages. + ## Commands ```bash diff --git a/README.md b/README.md index daf4884..10fdf8b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ --- language: - en -library_name: nnll +library_name: negate license_name: MPL-2.0 + Commons Clause 1.0 compatibility: - macos @@ -20,6 +20,27 @@ A scanning, training, and research library for detecting the origin of digital i [](https://ko-fi.com/darkshapes)

+### About + +Negate is a modular system of image processing and feature extraction pipelines that measure machine aptitude of differentiating between synthetic and human-origin illustrations. + +### Included Methods + +| Texture | Color | VAE Loss | Residual | Perturbation | Noise/Jitter | +| ----------------------------- | --------------------------------- | -------- | ------------- | ------------------ | ----------------- | +| local binary pattern | histogram oriented gradient (hog) | l1 | spectral | haar wavelet | snr/noise entropy | +| gray lvl co-occurrence matrix | variance | mse | laplacian | random resize crop | stroke features | +| energy | kurtosis | k1 | gaussian diff | patchification | | +| complexity | skew | bce | sobel | | | +| microtexture | palette features | | | | | + +### Feature Processing Options + +- Decision Tree + PCA +- SVM (RBF) +- MLP +- LR + ## Quick Start ![MacOS](https://darkshapes.org/img/macos.svg) Terminal @@ -55,7 +76,9 @@ Train a new model with the following command: ## Technical Details & Research Results -
Expand +### Abstract + +Previous research has demonstrated the possibility of identifying deepfakes, synthetic images, illustrations and photographs. Yet generative models have since undergone dramatic improvements, challenging past identification research and calling into question the future efficacy of these developments. Most methods chose images easily discernible as synthetic by the naked eye of a trained artist, or evaluated their success against open models exclusively. In this work, we create a comprehensive analysis suite for decomposition and feature extraction of digital images to study the effectiveness of these methods. Then, using an ensemble of previous techniques, we train simple decision trees and SVM models on these features to achieve >70% accuracy in detecting synthetic vs. genuine illustrations. Our methods of training and inference require only consumer-grade hardware, use exclusively consensual datasets provided by artists and Creative-Commons sources, and provide reliable estimates against the modern image products of both open and black-box closed-source models. ### Structure @@ -71,17 +94,15 @@ Directories are located within `$HOME\.local\bin\uv\tools` or `.local/bin/uv/too --- -| Module | Summary | Purpose | -| ------------ | ------------------- | ------------------------------------------------------------------------------------------------------------------------------ | -| negate | core module | Root source code folder. Creates CLI arguments and interprets commands. | -| →→ decompose | image processing | Random Resize Crop and Haar Wavelet transformations - [arxiv:2511.14030](https://arxiv.org/abs/2511.14030) | -| →→ extract | feature processing | Laplace/Sobel/Spectral analysis, VIT/VAE extraction, cross‑entropy loss - [arxiv:2411.19417](https://arxiv.org/abs/2411.19417) | -| →→ io | load / save / state | Hyperparameters, image datasets, console messages, model serialization and conversion. | -| →→ metrics | evaluation | Graphs, visualizations, model performance metadata, and a variety of heuristics for results interpretation. | -| → inference | predictions | Detector functions to determine origin from trained model predictions. | -| → train | XGBoost | PCA data transforms and gradient-boosted decision tree model training. | - -### Research +| Module | Summary | Purpose | +| ------------ | ------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | +| negate | core module | Root source code folder. Creates CLI arguments and interprets commands. | +| →→ decompose | image processing | RRC, Wavelet Transform - [arxiv:2511.14030](https://arxiv.org/abs/2511.14030) [https://arxiv.org/abs/2504.07078](https://arxiv.org/abs/2504.07078) | +| →→ extract | feature processing | Residual analysis, VIT/VAE extraction, cross‑entropy loss - [arxiv:2411.19417](https://arxiv.org/abs/2411.19417) | +| →→ io | load / save / state | Hyperparameters, image datasets, console messages, model serialization and conversion. | +| →→ metrics | evaluation | Graphs, visualizations, model performance metadata, and a variety of heuristics for results interpretation. | +| → inference | predictions | Detector functions to determine origin from trained model predictions. | +| → train | XGBoost | PCA data transforms and gradient-boosted decision tree model training. |
@@ -89,10 +110,6 @@ Directories are located within `$HOME\.local\bin\uv\tools` or `.local/bin/uv/too Visualization of VAE mean loss results for the Flux Klein model
-The ubiqity of online services, connected presence, generative models, and the proliferate digital output that has accompanied these nascent developments have yielded a colossal and simultaneous disintegration of trust, judgement and ecological welfare, exacerbating prevailing struggles in all species of life. While the outcome of these deep-seated issues is beyond the means of a small group of academic researchers to determine, and while remediation efforts will require far more resources than attention alone, we have nevertheless taken pause to reconsider the consequences of our way of life while investigating the prospects of new avenues that may diminish harm. - -
- ```bib @misc{darkshapes2026, author={darkshapes}, diff --git a/config/config.toml b/config/config.toml index 173bd32..2a9b3b5 100644 --- a/config/config.toml +++ b/config/config.toml @@ -17,9 +17,9 @@ feat_ext_path = "" # Path to save the model in or null for default: [$HO [datasets] eval_data = ["tellif/ai_vs_real_image_semantically_similar"] -genuine_data = ["KarimSayed/cat-breed-fiass-index"] +genuine_data = ["huggan/wikiart"] #["KarimSayed/cat-breed-fiass-index"] genuine_local = [] -synthetic_data = ["exdysa/nano-banana-pro-generated-1k-clone", "ash12321/seedream-4.5-generated-2k"] +synthetic_data = ["exdysa/nano-banana-pro-generated-1k-clone"] #, "ash12321/seedream-4.5-generated-2k"] synthetic_local = [] [vae.library] @@ -57,3 +57,16 @@ n_components = 0.95 # Number of components for dimensionality reduction num_boost_round = 200 # Number of boosting rounds test_size = 0.2 # 80/20 training split default verbose_eval = 20 + +[ensemble] +sample_size = 100 +n_folds = 5 +abstain_threshold = 0.3 +svm_c = 10.0 +mlp_hidden_layers = 100 +mlp_activation = "relu" +mlp_max_iter = 1000 +cv = 3 +method = "sigmoid" +gamma = "scale" +kernel = "rbf" diff --git a/negate/decompose/surface.py b/negate/decompose/surface.py new file mode 100644 index 0000000..80b27fd --- /dev/null +++ b/negate/decompose/surface.py @@ -0,0 +1,253 @@ +# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 +# + +"""Extended frequency analysis branch (FFT/DCT) that captures spectral fingerprints left by generative models. + +Features are grouped into 6 categories: + - Brightness (2): mean, entropy + - Color (23): RGB/HSV histogram statistics + - Texture (6): GLCM + LBP + - Shape (6): HOG + edge length + - Noise (2): noise entropy, SNR + - Frequency (10): FFT/DCT spectral analysis +""" + +from __future__ import annotations + +from typing import Any +import numpy as np +from numpy.typing import NDArray +from PIL.Image import Image +from scipy.stats import skew, kurtosis +from skimage.feature import graycomatrix, graycoprops, local_binary_pattern + + +class NumericImage: + image: Image + TARGET_SIZE = (255, 255) + + def __init__(self, image: Image) -> None: + self._image = image + self.to_gray() + self.to_rgb() + self.rgb2hsv() + + @property + def gray(self) -> np.ndarray[tuple[Any, ...], np.dtype[np.float64]]: + return self.shade + + @property + def color(self): + return self.rgb + + @property + def hsv(self): + return self._hsv + + def to_gray(self) -> NDArray: + """Resize and convert to float64 grayscale.""" + img = self._image.convert("L").resize(self.TARGET_SIZE, Image.BICUBIC) + self.shade = np.asarray(img, dtype=np.float64) / 255.0 + + def to_rgb(self) -> NDArray: + """Resize and convert to float64 RGB [0,1].""" + img = self._image.convert("RGB").resize(self.TARGET_SIZE, Image.BICUBIC) + self.rgb = np.asarray(img, dtype=np.float64) / 255.0 + + def rgb2hsv(self) -> NDArray: + """Convert RGB [0,1] array to HSV [0,1].""" + from colorsys import _hsv_from_rgb as hsv_from_rgb + + rgb = self.rgb.copy() + rgb = rgb / 255.0 if rgb.max() > 1 else rgb + h, w, c = rgb.shape + flat = rgb.reshape(-1, 3) + result = np.array([hsv_from_rgb(r, g, b) for r, g, b in flat]) + self._hsv = result.T.reshape(h, w, 3) + + +class SurfaceFeatures: + """Extract artwork features for AI detection. + + Usage: + >>> img = NumericImage(pil_image) + >>> extractor = VisualFeatures(img) + >>> features = extractor() + >>> len(features) + """ + + def __init__(self, image: NumericImage): + self.image = image + + def __call__(self) -> dict[str, float]: + """Extract all features from the NumericImage. + + :returns: Dictionary of scalar features. + """ + gray = self.image.gray + rgb = self.image.color + + features: dict[str, float] = {} + features |= self.brightness_features(gray) + features |= self.color_features(rgb) + features |= self.texture_features(gray) + features |= self.shape_features(gray) + features |= self.noise_features(gray) + features |= self.frequency_features(gray) + + return features + + def entropy(counts: NDArray) -> float: + """Compute Shannon entropy from histogram counts.""" + probs = counts / counts.sum() + probs = probs[probs > 0] + return -np.sum(probs * np.log2(probs)) + + def brightness_features(self, gray: NDArray) -> dict[str, float]: + """Mean and entropy of pixel brightness.""" + return { + "mean_brightness": float(gray.mean()), + "entropy_brightness": float(self.entropy(np.histogram(gray, bins=256, range=(0, 1))[0] + 1e-10)), + } + + def color_features(self, rgb: NDArray) -> dict[str, float]: + """RGB and HSV histogram statistics""" + features: dict[str, float] = {} + + for i, name in enumerate(("red", "green", "blue")): + channel = rgb[:, :, i].ravel() + features[f"{name}_mean"] = float(channel.mean()) + features[f"{name}_variance"] = float(channel.var()) + features[f"{name}_kurtosis"] = float(kurtosis(channel)) + features[f"{name}_skewness"] = float(skew(channel)) + + rgb_flat = rgb.reshape(-1, 3) + rgb_hist = np.histogramdd(rgb_flat, bins=32)[0] + features["rgb_entropy"] = float(self.entropy(rgb_hist.ravel() + 1e-10)) + + hsv = self.image.hsv + for i, name in enumerate(("hue", "saturation", "value")): + channel = hsv[:, :, i].ravel() + features[f"{name}_variance"] = float(channel.var()) + features[f"{name}_kurtosis"] = float(kurtosis(channel)) + features[f"{name}_skewness"] = float(skew(channel)) + + hsv_flat = hsv.reshape(-1, 3) + hsv_hist = np.histogramdd(hsv_flat, bins=32)[0] + features["hsv_entropy"] = float(self.entropy(hsv_hist.ravel() + 1e-10)) + + return features + + def shape_features(self, gray: NDArray) -> dict[str, float]: + """HOG statistics and edge length.""" + from skimage.feature import hog + from PIL import Image as PilImage + import numpy as np + + hog_features = hog(gray, pixels_per_cell=(16, 16), cells_per_block=(2, 2), feature_vector=True) + + features: dict[str, float] = { + "hog_mean": float(hog_features.mean()), + "hog_variance": float(hog_features.var()), + "hog_kurtosis": float(kurtosis(hog_features)), + "hog_skewness": float(skew(hog_features)), + "hog_entropy": float(self.entropy(np.histogram(hog_features, bins=50)[0] + 1e-10)), + } + + gray_uint8 = (gray * 255).astype(np.uint8) + edges_array = np.asarray(PilImage.fromarray(gray_uint8).convert("L").point(lambda x: 0 if x < 128 else 255, "1")) + features["edgelen"] = float(edges_array.sum()) + + return features + + def noise_features(self, gray: NDArray) -> dict[str, float]: + """Noise entropy and signal-to-noise ratio.""" + from skimage.restoration import estimate_sigma + + sigma = estimate_sigma(gray) + noise = gray - np.clip(gray, gray.mean() - 2 * sigma, gray.mean() + 2 * sigma) + + noise_hist = np.histogram(noise.ravel(), bins=256)[0] + noise_ent = float(self.entropy(noise_hist + 1e-10)) + + signal_power = float(gray.var()) + noise_power = float(sigma**2) if sigma > 0 else 1e-10 + snr = float(10 * np.log10(signal_power / noise_power + 1e-10)) + + return { + "noise_entropy": noise_ent, + "snr": snr, + } + + def texture_features(self, gray: NDArray) -> dict[str, float]: + """GLCM and LBP texture features.""" + gray_uint8 = (gray * 255).astype(np.uint8) if gray.max() <= 1 else gray.astype(np.uint8) + + glcm = graycomatrix(gray_uint8, distances=[1], angles=[0], levels=256, symmetric=True, normed=True) + + features: dict[str, float] = { + "contrast": float(graycoprops(glcm, "contrast")[0, 0]), + "correlation": float(graycoprops(glcm, "correlation")[0, 0]), + "energy": float(graycoprops(glcm, "energy")[0, 0]), + "homogeneity": float(graycoprops(glcm, "homogeneity")[0, 0]), + } + + lbp = local_binary_pattern(gray_uint8, P=8, R=1, method="uniform") + features["lbp_entropy"] = float(self.entropy(np.histogram(lbp, bins=10)[0] + 1e-10)) + features["lbp_variance"] = float(lbp.var()) + + return features + + def frequency_features(self, gray: NDArray) -> dict[str, float]: + """FFT and DCT spectral analysis features meant to capture upsampling layers and attention patterns.""" + + from scipy.fft import dctn + from numpy.fft import fftfreq + + height, width = gray.shape + + fft_2d = np.fft.fft2(gray) + fft_shift = np.fft.fftshift(fft_2d) + magnitude = np.abs(fft_shift) + log_mag = np.log(magnitude + 1e-10) + phase = np.angle(fft_shift) + + center_h, center_w = height // 2, width // 2 + + y, x = np.ogrid[:height, :width] + radius = np.sqrt((x - center_w) ** 2 + (y - center_h) ** 2) + max_r = np.sqrt(center_h**2 + center_w**2) + + low_mask = radius < max_r * 0.2 + mid_mask = (radius >= max_r * 0.2) & (radius < max_r * 0.6) + high_mask = radius >= max_r * 0.6 + + total_energy = float((magnitude**2).sum() + 1e-10) + low_energy = float((magnitude[low_mask] ** 2).sum()) + mid_energy = float((magnitude[mid_mask] ** 2).sum()) + high_energy = float((magnitude[high_mask] ** 2).sum()) + + row_freqs = fftfreq(height)[:, None] * np.ones((1, width)) + col_freqs = np.ones((height, 1)) * fftfreq(width)[None, :] + spectral_centroid = float((np.sum(log_mag * np.abs(row_freqs)) + np.sum(log_mag * np.abs(col_freqs))) / (log_mag.sum() * 2 + 1e-10)) + + dct_coeffs = dctn(gray, type=2, norm="ortho") + dct_mag = np.abs(dct_coeffs) + + flat_dc_energy = float(dct_mag[0, 0] ** 2) + detail_ac_energy = float((dct_mag**2).sum() - flat_dc_energy) + + phase_coherence = float(phase.std()) + + return { + "fft_low_energy_ratio": low_energy / total_energy, + "fft_mid_energy_ratio": mid_energy / total_energy, + "fft_high_energy_ratio": high_energy / total_energy, + "fft_spectral_centroid": spectral_centroid, + "fft_log_mag_mean": float(log_mag.mean()), + "fft_log_mag_std": float(log_mag.std()), + "fft_phase_std": phase_coherence, + "dct_ac_dc_ratio": detail_ac_energy / (flat_dc_energy + 1e-10), + "dct_high_freq_energy": float((dct_mag[height // 2 :, width // 2 :] ** 2).sum() / (dct_mag**2).sum()), + "dct_sparsity": float((dct_mag < 0.01 * dct_mag.max()).mean()), + } diff --git a/negate/extract/ensemble.py b/negate/extract/ensemble.py new file mode 100644 index 0000000..e62ff8c --- /dev/null +++ b/negate/extract/ensemble.py @@ -0,0 +1,168 @@ +# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 +# + +"""Generate results PDF with multi-signal ensemble, calibrated thresholds, +abstention, and full precision/recall/F1 reporting. + +Usage: uv run python tests/generate_results_pdf.py +Output: results/artwork_detection_results.pdf +""" + +from __future__ import annotations + +from negate.decompose.surface import SurfaceFeatures +from negate.io.datasets import build_datasets +from negate.io.spec import Spec +from negate.metrics.pdf import generate_pdf + + +def load_and_extract(spec: Spec): + """Load dataset and extract surface features for ensemble evaluation. + :param spec: Specification containing data paths and hyperparameters. + :returns: Tuple of (features array, labels, feature names, gen images, synthetic images). + """ + import numpy as np + import pandas as pd + from tqdm import tqdm + + genuine_repo = spec.data_paths.genuine_repo + synthetic_repo = spec.data_paths.synthetic_repo + sample_size = spec.hyper_param.sample_size + + print(f"Loading {sample_size} human art + {sample_size} AI images...") + + dataset = build_datasets(spec, genuine_repo, synthetic_repo) + extractor = SurfaceFeatures() + features, labels = [], [] + + for row in tqdm(dataset, desc="Extracting artwork features"): + features.append(extractor(row["image"])) + labels.append(row["label"]) + + df = pd.DataFrame(features).fillna(0) + X = np.where(np.isfinite(df.to_numpy(dtype=np.float64)), df.to_numpy(dtype=np.float64), 0) + y = np.array(labels) + gen_data = dataset.filter(lambda x: x["label"] == 0) + syn_data = dataset.filter(lambda x: x["label"] == 1) + return X, y, list(df.columns), gen_data, syn_data + + +def run_ensemble_cv(X, y, spec: Spec): + """Run calibrated ensemble with abstention using spec hyperparameters. + :param X: Feature matrix. + :param y: Label vector. + :param spec: Specification containing model hyperparameters and config. + :returns: Tuple of (results dict, ensemble probabilities, predictions, full model). + """ + import numpy as np + import xgboost as xgb + from sklearn.calibration import CalibratedClassifierCV + from sklearn.metrics import ( + f1_score, + precision_score, + recall_score, + ) + from sklearn.model_selection import StratifiedKFold, cross_val_predict + from sklearn.neural_network import MLPClassifier + from sklearn.preprocessing import StandardScaler + from sklearn.svm import SVC + + hp = spec.hyper_param + ens = spec.ensemble + + scaler = StandardScaler() + X_s = scaler.fit_transform(X) + skf = StratifiedKFold(n_splits=ens.n_folds, shuffle=True, random_state=hp.seed) + + models = { + "SVM": CalibratedClassifierCV(SVC(C=ens.svm_c, gamma=ens.gamma, kernel=ens.kernel, random_state=hp.seed), cv=ens.cv, method=ens.method), + "MLP": CalibratedClassifierCV( + MLPClassifier(hidden_layer_sizes=(ens.mlp_hidden_layers,), activation=ens.mlp_activation, max_iter=ens.mlp_max_iter, random_state=hp.seed), + cv=ens.cv, + method=ens.method, + ), + } + + model_probs = {} + model_preds = {} + for name, model in models.items(): + probs = cross_val_predict(model, X_s, y, cv=skf, method="predict_proba")[:, 1] + model_probs[name] = probs + model_preds[name] = (probs > 0.5).astype(int) + + xgb_probs = np.zeros(len(y)) + for train_idx, test_idx in skf.split(X_s, y): + params = { + "sample_size": ens.sample_size, + "abstain_threshold": ens.abstain_threshold, + "n_folds": ens.n_folds, + **hp, + } + dtrain = xgb.DMatrix(X_s[train_idx], label=y[train_idx]) + dtest = xgb.DMatrix(X_s[test_idx]) + model = xgb.train( + params, + dtrain, + num_boost_round=spec.train_rounds.num_boost_round, + evals=[(xgb.DMatrix(X_s[test_idx], label=y[test_idx]), "test")], + early_stopping_rounds=spec.train_rounds.early_stopping_rounds, + verbose_eval=spec.train_rounds.verbose_eval, + ) + xgb_probs[test_idx] = model.predict(dtest) + + model_probs["XGBoost"] = xgb_probs + model_preds["XGBoost"] = (xgb_probs > 0.5).astype(int) + + ensemble_probs = sum(model_probs.values()) / len(model_probs) + ensemble_preds = (ensemble_probs > 0.5).astype(int) + + model_probs["Ensemble"] = ensemble_probs + model_preds["Ensemble"] = ensemble_preds + + results = {} + for name in model_probs: + probs = model_probs[name] + preds = model_preds[name] + results[name] = { + "accuracy": (preds == y).mean(), + "precision": precision_score(y, preds), + "recall": recall_score(y, preds), + "f1": f1_score(y, preds), + } + + abstain_thresh = ens.abstain_threshold + uncertain_mask = (ensemble_probs > abstain_thresh) & (ensemble_probs < (1 - abstain_thresh)) + confident_preds = ensemble_preds.copy() + confident_preds[uncertain_mask] = -1 # Mark uncertain as -1 + + results["Ensemble_With_Abstention"] = { + "accuracy": (confident_preds == y).sum() / (y.shape[0] - uncertain_mask.sum()) if (y.shape[0] - uncertain_mask.sum()) > 0 else 0, + "abstention_rate": uncertain_mask.mean(), + } + + full_xgb_params = {**spec.hyper_param} + full_model = xgb.train(full_xgb_params, xgb.DMatrix(X_s, label=y), num_boost_round=spec.train_rounds.num_boost_round) + + return results, ensemble_probs, ensemble_preds, full_model + + +def main(): + import numpy as np + + X, y, names, imgs_h, imgs_a = load_and_extract() + print(f"Dataset: {np.sum(y == 0)} Genuine + {np.sum(y == 1)} Synthetic, {X.shape[1]} features") + + results, ens_probs, ens_preds, model = run_ensemble_cv(X, y) + + print(f"\n{'Model':<15} {'Acc':>8} {'Prec':>8} {'Rec':>8} {'F1':>8} {'AUC':>8}") + print("-" * 55) + for name, r in results.items(): + extra = f" ({r.get('n_abstained', '-')} abstained)" if "n_abstained" in r else "" + print(f"{name:<15} {r['accuracy']:>7.1%} {r['precision']:>7.1%} {r['recall']:>7.1%} {r['f1']:>7.1%} {r['roc_auc']:>7.4f}{extra}") + + generate_pdf(X, y, names, results, ens_probs, ens_preds, model, imgs_h, imgs_a) + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/negate/extract/feature_artwork.py b/negate/extract/feature_artwork.py deleted file mode 100644 index 58bc7c5..0000000 --- a/negate/extract/feature_artwork.py +++ /dev/null @@ -1,252 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -# - -"""Artwork feature extraction for AI-generated image detection. - -Implements the 39-feature extraction pipeline from: - Li & Stamp, "Detecting AI-generated Artwork", arXiv:2504.07078, 2025. - -Extended with a dedicated frequency analysis branch (FFT/DCT) that captures -spectral fingerprints left by generative models. - -Features are grouped into 6 categories: - - Brightness (2): mean, entropy - - Color (23): RGB/HSV histogram statistics - - Texture (6): GLCM + LBP - - Shape (6): HOG + edge length - - Noise (2): noise entropy, SNR - - Frequency (10): FFT/DCT spectral analysis -""" - -from __future__ import annotations - -import numpy as np -from numpy.typing import NDArray -from PIL import Image -from scipy.stats import entropy, kurtosis, skew -from skimage.color import rgb2gray, rgb2hsv -from skimage.feature import graycomatrix, graycoprops, local_binary_pattern - - -_TARGET_SIZE = (255, 255) - - -def _to_array(image: Image.Image) -> NDArray: - """Resize to 255x255 and convert to float64 numpy array.""" - image = image.convert("RGB").resize(_TARGET_SIZE, Image.BICUBIC) - return np.asarray(image, dtype=np.float64) - - -def _brightness_features(gray: NDArray) -> dict[str, float]: - """Mean and entropy of pixel brightness.""" - return { - "mean_brightness": float(gray.mean()), - "entropy_brightness": float(entropy(np.histogram(gray, bins=256, range=(0, 1))[0] + 1e-10)), - } - - -def _color_features(rgb: NDArray) -> dict[str, float]: - """RGB and HSV histogram statistics (23 features).""" - features: dict[str, float] = {} - - # RGB: mean, variance, kurtosis, skewness per channel + entropy - for i, name in enumerate(("red", "green", "blue")): - channel = rgb[:, :, i].ravel() - features[f"{name}_mean"] = float(channel.mean()) - features[f"{name}_variance"] = float(channel.var()) - features[f"{name}_kurtosis"] = float(kurtosis(channel)) - features[f"{name}_skewness"] = float(skew(channel)) - - # RGB entropy (joint) - rgb_flat = rgb.reshape(-1, 3) - rgb_hist = np.histogramdd(rgb_flat, bins=32)[0] - features["rgb_entropy"] = float(entropy(rgb_hist.ravel() + 1e-10)) - - # HSV: variance, kurtosis, skewness per channel + entropy - hsv = rgb2hsv(rgb / 255.0 if rgb.max() > 1 else rgb) - for i, name in enumerate(("hue", "saturation", "value")): - channel = hsv[:, :, i].ravel() - features[f"{name}_variance"] = float(channel.var()) - features[f"{name}_kurtosis"] = float(kurtosis(channel)) - features[f"{name}_skewness"] = float(skew(channel)) - - hsv_flat = hsv.reshape(-1, 3) - hsv_hist = np.histogramdd(hsv_flat, bins=32)[0] - features["hsv_entropy"] = float(entropy(hsv_hist.ravel() + 1e-10)) - - return features - - -def _texture_features(gray: NDArray) -> dict[str, float]: - """GLCM and LBP texture features (6 features).""" - # GLCM requires uint8 - gray_uint8 = (gray * 255).astype(np.uint8) if gray.max() <= 1 else gray.astype(np.uint8) - - glcm = graycomatrix(gray_uint8, distances=[1], angles=[0], levels=256, symmetric=True, normed=True) - - features: dict[str, float] = { - "contrast": float(graycoprops(glcm, "contrast")[0, 0]), - "correlation": float(graycoprops(glcm, "correlation")[0, 0]), - "energy": float(graycoprops(glcm, "energy")[0, 0]), - "homogeneity": float(graycoprops(glcm, "homogeneity")[0, 0]), - } - - # LBP - lbp = local_binary_pattern(gray_uint8, P=8, R=1, method="uniform") - features["lbp_entropy"] = float(entropy(np.histogram(lbp, bins=10)[0] + 1e-10)) - features["lbp_variance"] = float(lbp.var()) - - return features - - -def _shape_features(gray: NDArray) -> dict[str, float]: - """HOG statistics and edge length (6 features).""" - from skimage.feature import hog, canny - - # HOG - hog_features = hog(gray, pixels_per_cell=(16, 16), cells_per_block=(2, 2), feature_vector=True) - - features: dict[str, float] = { - "hog_mean": float(hog_features.mean()), - "hog_variance": float(hog_features.var()), - "hog_kurtosis": float(kurtosis(hog_features)), - "hog_skewness": float(skew(hog_features)), - "hog_entropy": float(entropy(np.histogram(hog_features, bins=50)[0] + 1e-10)), - } - - # Edge length via Canny - edges = canny(gray if gray.max() <= 1 else gray / 255.0) - features["edgelen"] = float(edges.sum()) - - return features - - -def _noise_features(gray: NDArray) -> dict[str, float]: - """Noise entropy and signal-to-noise ratio (2 features).""" - from skimage.restoration import estimate_sigma - - # Estimate noise - sigma = estimate_sigma(gray) - noise = gray - np.clip(gray, gray.mean() - 2 * sigma, gray.mean() + 2 * sigma) - - noise_hist = np.histogram(noise.ravel(), bins=256)[0] - noise_ent = float(entropy(noise_hist + 1e-10)) - - # SNR - signal_power = float(gray.var()) - noise_power = float(sigma ** 2) if sigma > 0 else 1e-10 - snr = float(10 * np.log10(signal_power / noise_power + 1e-10)) - - return { - "noise_entropy": noise_ent, - "snr": snr, - } - - -def _frequency_features(gray: NDArray) -> dict[str, float]: - """FFT and DCT spectral analysis features (10 features). - - AI generators leave characteristic signatures in the frequency domain - due to upsampling layers and attention patterns. This branch captures - those patterns independently of pixel-space features. - """ - from scipy.fft import dctn - from numpy.fft import fftfreq - - h, w = gray.shape - - # 2D FFT analysis - fft_2d = np.fft.fft2(gray) - fft_shift = np.fft.fftshift(fft_2d) - magnitude = np.abs(fft_shift) - log_mag = np.log(magnitude + 1e-10) - phase = np.angle(fft_shift) - - center_h, center_w = h // 2, w // 2 - - # Radial frequency bands (low/mid/high) - y, x = np.ogrid[:h, :w] - radius = np.sqrt((x - center_w) ** 2 + (y - center_h) ** 2) - max_r = np.sqrt(center_h ** 2 + center_w ** 2) - - low_mask = radius < max_r * 0.2 - mid_mask = (radius >= max_r * 0.2) & (radius < max_r * 0.6) - high_mask = radius >= max_r * 0.6 - - total_energy = float((magnitude ** 2).sum() + 1e-10) - low_energy = float((magnitude[low_mask] ** 2).sum()) - mid_energy = float((magnitude[mid_mask] ** 2).sum()) - high_energy = float((magnitude[high_mask] ** 2).sum()) - - # Spectral centroid (center of mass of frequency distribution) - row_freqs = fftfreq(h)[:, None] * np.ones((1, w)) - col_freqs = np.ones((h, 1)) * fftfreq(w)[None, :] - spectral_centroid = float( - (np.sum(log_mag * np.abs(row_freqs)) + np.sum(log_mag * np.abs(col_freqs))) - / (log_mag.sum() * 2 + 1e-10) - ) - - # DCT analysis — captures compression and generation artifacts - dct_coeffs = dctn(gray, type=2, norm="ortho") - dct_mag = np.abs(dct_coeffs) - - # Ratio of AC to DC energy (how much detail vs flat) - dc_energy = float(dct_mag[0, 0] ** 2) - ac_energy = float((dct_mag ** 2).sum() - dc_energy) - - # Phase coherence — AI images often have more regular phase patterns - phase_std = float(phase.std()) - - return { - "fft_low_energy_ratio": low_energy / total_energy, - "fft_mid_energy_ratio": mid_energy / total_energy, - "fft_high_energy_ratio": high_energy / total_energy, - "fft_spectral_centroid": spectral_centroid, - "fft_log_mag_mean": float(log_mag.mean()), - "fft_log_mag_std": float(log_mag.std()), - "fft_phase_std": phase_std, - "dct_ac_dc_ratio": ac_energy / (dc_energy + 1e-10), - "dct_high_freq_energy": float((dct_mag[h // 2:, w // 2:] ** 2).sum() / (dct_mag ** 2).sum()), - "dct_sparsity": float((dct_mag < 0.01 * dct_mag.max()).mean()), - } - - -class ArtworkExtract: - """Extract artwork features for AI detection. - - Combines the 39 features from Li & Stamp (2025) with a dedicated - frequency analysis branch (10 features) for 49 total features. - - All features are CPU-only and work on any image type (photos, - illustrations, artwork). No pretrained models required. - - Usage: - >>> extractor = ArtworkExtract() - >>> features = extractor(pil_image) - >>> len(features) # 49 - """ - - def __call__(self, image: Image.Image) -> dict[str, float]: - """Extract all features from a single PIL image. - - :param image: PIL Image in any mode (will be converted to RGB). - :returns: Dictionary of scalar features. - """ - rgb = _to_array(image) - gray = rgb2gray(rgb / 255.0 if rgb.max() > 1 else rgb) - - features: dict[str, float] = {} - features |= _brightness_features(gray) - features |= _color_features(rgb) - features |= _texture_features(gray) - features |= _shape_features(gray) - features |= _noise_features(gray) - features |= _frequency_features(gray) - - return features - - def feature_names(self) -> list[str]: - """Return ordered list of feature names.""" - # Generate from a dummy image to get exact keys - dummy = Image.new("RGB", (255, 255), color="gray") - return list(self(dummy).keys()) diff --git a/negate/extract/feature_style.py b/negate/extract/feature_style.py deleted file mode 100644 index 8adf8d4..0000000 --- a/negate/extract/feature_style.py +++ /dev/null @@ -1,308 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -# - -"""Style-specific feature extraction for AI-generated artwork detection. - -Captures properties of human artistic craft that AI generators struggle to -replicate authentically: - -Features (15 total): - - Stroke analysis (4): direction variance, length distribution, pressure simulation - - Color palette (4): palette size, harmony, temperature variance, saturation coherence - - Composition (4): rule-of-thirds energy, symmetry score, focal point strength, edge density distribution - - Micro-texture (3): grain regularity, patch-level entropy variance, brushwork periodicity -""" - -from __future__ import annotations - -import numpy as np -from numpy.typing import NDArray -from PIL import Image -from scipy.stats import entropy, kurtosis -from scipy.ndimage import sobel, gaussian_filter, uniform_filter - -_TARGET_SIZE = (255, 255) - - -def _to_gray(image: Image.Image) -> NDArray: - """Resize and convert to float64 grayscale.""" - img = image.convert("L").resize(_TARGET_SIZE, Image.BICUBIC) - return np.asarray(img, dtype=np.float64) / 255.0 - - -def _to_rgb(image: Image.Image) -> NDArray: - """Resize and convert to float64 RGB [0,1].""" - img = image.convert("RGB").resize(_TARGET_SIZE, Image.BICUBIC) - return np.asarray(img, dtype=np.float64) / 255.0 - - -def _stroke_features(gray: NDArray) -> dict[str, float]: - """Analyze brush stroke properties via gradient analysis. - - Human artists have variable stroke direction and pressure. - AI tends to produce more uniform gradient patterns. - """ - # Gradient direction via Sobel - gx = sobel(gray, axis=1) - gy = sobel(gray, axis=0) - magnitude = np.sqrt(gx**2 + gy**2) - direction = np.arctan2(gy, gx) - - # Only analyze pixels with significant gradient (edges/strokes) - threshold = np.percentile(magnitude, 75) - stroke_mask = magnitude > threshold - stroke_directions = direction[stroke_mask] - stroke_magnitudes = magnitude[stroke_mask] - - # Direction variance — humans have more varied stroke directions - dir_hist = np.histogram(stroke_directions, bins=36, range=(-np.pi, np.pi))[0] - stroke_dir_entropy = float(entropy(dir_hist + 1e-10)) - - # Direction variance in local patches (16x16) - h, w = gray.shape - patch_size = 16 - local_dir_vars = [] - for y in range(0, h - patch_size, patch_size): - for x in range(0, w - patch_size, patch_size): - patch_dirs = direction[y:y+patch_size, x:x+patch_size] - patch_mags = magnitude[y:y+patch_size, x:x+patch_size] - # Weight by magnitude - if patch_mags.sum() > 1e-10: - weighted_var = float(np.average( - (patch_dirs - np.average(patch_dirs, weights=patch_mags + 1e-10))**2, - weights=patch_mags + 1e-10 - )) - local_dir_vars.append(weighted_var) - - # Stroke pressure simulation — variation in gradient magnitude along strokes - # Humans have pressure variation; AI is more uniform - pressure_kurtosis = float(kurtosis(stroke_magnitudes)) if len(stroke_magnitudes) > 4 else 0.0 - - # Stroke length distribution — via connected component-like analysis - # Use thresholded magnitude as binary stroke map - stroke_binary = (magnitude > threshold).astype(np.float64) - # Row-wise and col-wise run lengths - runs = [] - for row in stroke_binary: - current_run = 0 - for val in row: - if val > 0: - current_run += 1 - elif current_run > 0: - runs.append(current_run) - current_run = 0 - stroke_length_var = float(np.var(runs)) if len(runs) > 1 else 0.0 - - return { - "stroke_dir_entropy": stroke_dir_entropy, - "stroke_local_dir_var": float(np.mean(local_dir_vars)) if local_dir_vars else 0.0, - "stroke_pressure_kurtosis": pressure_kurtosis, - "stroke_length_var": stroke_length_var, - } - - -def _palette_features(rgb: NDArray) -> dict[str, float]: - """Analyze color palette properties. - - Human artists work with deliberate, often limited palettes. - AI generators tend to use broader, less coherent color distributions. - """ - # Flatten to pixel colors - pixels = rgb.reshape(-1, 3) - - # Effective palette size — number of distinct color clusters - # Quantize to 8-level per channel and count unique - quantized = (pixels * 7).astype(int) - unique_colors = len(set(map(tuple, quantized))) - max_possible = 8**3 # 512 - palette_richness = float(unique_colors / max_possible) - - # Color harmony — measure how well colors cluster in HSV hue space - from skimage.color import rgb2hsv - hsv = rgb2hsv(rgb) - hue = hsv[:, :, 0].ravel() - sat = hsv[:, :, 1].ravel() - - # Only consider saturated pixels (ignore grays) - saturated = sat > 0.15 - if saturated.sum() > 10: - hue_saturated = hue[saturated] - hue_hist = np.histogram(hue_saturated, bins=36, range=(0, 1))[0] - # Harmony = how peaked the hue distribution is (fewer peaks = more harmonious) - hue_entropy = float(entropy(hue_hist + 1e-10)) - # Peak count — number of significant hue modes - hue_smooth = gaussian_filter(hue_hist.astype(float), sigma=2) - peaks = np.sum((hue_smooth[1:-1] > hue_smooth[:-2]) & (hue_smooth[1:-1] > hue_smooth[2:])) - palette_harmony = float(peaks) - else: - hue_entropy = 0.0 - palette_harmony = 0.0 - - # Temperature variance — warm vs cool across image regions - # Warm = red/yellow hue, cool = blue/green - patch_size = 32 - h, w = rgb.shape[:2] - temps = [] - for y in range(0, h - patch_size, patch_size): - for x in range(0, w - patch_size, patch_size): - patch = rgb[y:y+patch_size, x:x+patch_size] - # Simple temperature: red-channel dominance vs blue - temp = float(patch[:, :, 0].mean() - patch[:, :, 2].mean()) - temps.append(temp) - temp_variance = float(np.var(temps)) if temps else 0.0 - - # Saturation coherence — how consistent saturation is across patches - sat_patches = [] - for y in range(0, h - patch_size, patch_size): - for x in range(0, w - patch_size, patch_size): - patch_sat = hsv[y:y+patch_size, x:x+patch_size, 1] - sat_patches.append(float(patch_sat.mean())) - sat_coherence = float(np.std(sat_patches)) if sat_patches else 0.0 - - return { - "palette_richness": palette_richness, - "palette_hue_entropy": hue_entropy, - "palette_harmony_peaks": palette_harmony, - "palette_temp_variance": temp_variance, - } - - -def _composition_features(gray: NDArray) -> dict[str, float]: - """Analyze compositional properties. - - Human artists follow compositional rules (rule of thirds, focal points). - AI images may have different compositional statistics. - """ - h, w = gray.shape - - # Rule of thirds — energy at third lines vs elsewhere - third_h = [h // 3, 2 * h // 3] - third_w = [w // 3, 2 * w // 3] - margin = max(h, w) // 20 - - # Energy at third intersections - thirds_energy = 0.0 - for th in third_h: - for tw in third_w: - y_lo = max(0, th - margin) - y_hi = min(h, th + margin) - x_lo = max(0, tw - margin) - x_hi = min(w, tw + margin) - thirds_energy += float(gray[y_lo:y_hi, x_lo:x_hi].var()) - thirds_energy /= 4.0 - - total_energy = float(gray.var()) - thirds_ratio = thirds_energy / (total_energy + 1e-10) - - # Symmetry — correlation between left and right halves - left = gray[:, :w//2] - right = gray[:, w//2:w//2 + left.shape[1]][:, ::-1] # mirror - if left.shape == right.shape: - symmetry = float(np.corrcoef(left.ravel(), right.ravel())[0, 1]) - else: - symmetry = 0.0 - - # Focal point strength — how concentrated the high-detail areas are - detail = np.abs(sobel(gray, axis=0)) + np.abs(sobel(gray, axis=1)) - detail_flat = detail.ravel() - total_detail = detail_flat.sum() + 1e-10 - - # Find center of mass of detail - yy, xx = np.mgrid[:h, :w] - cy = float(np.sum(yy * detail) / total_detail) - cx = float(np.sum(xx * detail) / total_detail) - - # Concentration around center of mass (lower = more focused focal point) - dist_from_focal = np.sqrt((yy - cy)**2 + (xx - cx)**2) - focal_spread = float(np.sum(dist_from_focal * detail) / total_detail) - focal_strength = 1.0 / (focal_spread + 1.0) # inverse = stronger focal point - - # Edge density distribution — where edges are in the image (center vs periphery) - edges = detail > np.percentile(detail, 80) - center_mask = np.zeros_like(edges) - ch, cw = h // 4, w // 4 - center_mask[ch:3*ch, cw:3*cw] = True - center_edge_ratio = float(edges[center_mask].sum()) / (float(edges.sum()) + 1e-10) - - return { - "comp_thirds_ratio": thirds_ratio, - "comp_symmetry": symmetry, - "comp_focal_strength": focal_strength, - "comp_center_edge_ratio": center_edge_ratio, - } - - -def _microtexture_features(gray: NDArray) -> dict[str, float]: - """Analyze micro-texture properties. - - Human art has irregular grain from physical media (canvas, paper, pigment). - AI images have subtly different micro-texture statistics. - """ - h, w = gray.shape - patch_size = 16 - - # Patch-level entropy variance - patch_entropies = [] - for y in range(0, h - patch_size, patch_size): - for x in range(0, w - patch_size, patch_size): - patch = gray[y:y+patch_size, x:x+patch_size] - hist = np.histogram(patch, bins=32, range=(0, 1))[0] - patch_entropies.append(float(entropy(hist + 1e-10))) - - entropy_variance = float(np.var(patch_entropies)) if patch_entropies else 0.0 - - # Grain regularity — autocorrelation of high-frequency residual - # High-pass via difference from blurred version - blurred = gaussian_filter(gray, sigma=1.0) - residual = gray - blurred - - # Autocorrelation at small lags (grain regularity) - res_flat = residual.ravel() - if len(res_flat) > 100: - acf_1 = float(np.corrcoef(res_flat[:-1], res_flat[1:])[0, 1]) - acf_2 = float(np.corrcoef(res_flat[:-2], res_flat[2:])[0, 1]) - else: - acf_1, acf_2 = 0.0, 0.0 - - grain_regularity = (acf_1 + acf_2) / 2.0 # higher = more regular/periodic grain - - # Brushwork periodicity — FFT of the residual, look for peaks - fft_res = np.fft.fft2(residual) - fft_mag = np.abs(fft_res) - # Ratio of peak to mean (higher = more periodic = more AI-like) - fft_peak_ratio = float(fft_mag.max() / (fft_mag.mean() + 1e-10)) - - return { - "micro_entropy_variance": entropy_variance, - "micro_grain_regularity": grain_regularity, - "micro_brushwork_periodicity": fft_peak_ratio, - } - - -class StyleExtract: - """Extract 15 style-specific features for artwork AI detection. - - These features target properties of human artistic craft: - stroke patterns, color palettes, composition, and micro-texture. - - Usage: - >>> extractor = StyleExtract() - >>> features = extractor(pil_image) - >>> len(features) # 15 - """ - - def __call__(self, image: Image.Image) -> dict[str, float]: - gray = _to_gray(image) - rgb = _to_rgb(image) - - features: dict[str, float] = {} - features |= _stroke_features(gray) - features |= _palette_features(rgb) - features |= _composition_features(gray) - features |= _microtexture_features(gray) - - return features - - def feature_names(self) -> list[str]: - dummy = Image.new("RGB", (255, 255), color="gray") - return list(self(dummy).keys()) diff --git a/negate/io/config.py b/negate/io/config.py index 7c6d8b5..e4e7f84 100644 --- a/negate/io/config.py +++ b/negate/io/config.py @@ -115,6 +115,22 @@ class NegateDataPaths(NamedTuple): synthetic_local: list +class NegateEnsembleConfig(NamedTuple): + """Configuration for ensemble detection and evaluation.""" + + sample_size: int + n_folds: int + abstain_threshold: float + svm_c: int + mlp_hidden_layers: int + mlp_activation: str + mlp_max_iter: int + cv: int + method: str + gamma: str + kernel: str + + class NegateModelConfig: """Model configuration with library auto-selection.""" @@ -271,7 +287,9 @@ def np_dtype(self, value: str) -> np.typing.DTypeLike: return self._np_dtype -def load_config_options(file_path_named: str = f"config{os.sep}config.toml") -> tuple[NegateConfig, NegateHyperParam, NegateDataPaths, NegateModelConfig, Chip, NegateTrainRounds]: +def load_config_options( + file_path_named: str = f"config{os.sep}config.toml", +) -> tuple[NegateConfig, NegateHyperParam, NegateEnsembleConfig, NegateDataPaths, NegateModelConfig, Chip, NegateTrainRounds]: """Load configuration options.\n :return: Tuple of (NegateConfig, NegateHyperParam, NegateDataPaths).""" @@ -285,10 +303,12 @@ def load_config_options(file_path_named: str = f"config{os.sep}config.toml") -> dataset_cfg = data.pop("datasets", {}) library_cfg = data.pop("library", {}) rounds_cfg = data.pop("rounds", {}) + ensemble_cfg = data.pop("ensemble", {}) return ( NegateConfig(**data), NegateHyperParam(**param_cfg), + NegateEnsembleConfig(**ensemble_cfg), NegateDataPaths(**dataset_cfg), NegateModelConfig(data=models | library_cfg, vae=vae), Chip(), @@ -296,4 +316,4 @@ def load_config_options(file_path_named: str = f"config{os.sep}config.toml") -> ) -negate_options, hyperparam_config, data_paths, model_config, chip, train_rounds = load_config_options() +negate_options, hyperparam_config, ensemble_config, data_paths, model_config, chip, train_rounds = load_config_options() diff --git a/negate/io/datasets.py b/negate/io/datasets.py index 8f95227..2c92641 100644 --- a/negate/io/datasets.py +++ b/negate/io/datasets.py @@ -13,7 +13,14 @@ from negate.io.spec import Spec, root_folder -def prepare_dataset(features_dataset: Dataset, spec: Spec): +def prepare_dataset(features_dataset: Dataset, spec: Spec) -> np.ndarray: + """Transform nested wavelet feature dictionaries into a flat numerical matrix.\n + :param features_dataset: HuggingFace Dataset with 'results' column containing list of dicts. + :param spec: Specification container with dtype and ONNX configuration. + :return: 2D numpy array of shape (samples, features) ready for model input. + :raises KeyError: If 'results' column is missing from dataset. + """ + samples = features_dataset["results"] all_dicts = [d for row in samples for d in row] @@ -89,6 +96,7 @@ def build_datasets( spec: Spec, genuine_path: Path | None = None, synthetic_path: Path | None = None, + concatenate: bool = True, ) -> Dataset: """Builds synthetic and genuine datasets.\n :param input_folder: Path to folder containing data. (optional) diff --git a/negate/io/spec.py b/negate/io/spec.py index c772d59..f9be37f 100644 --- a/negate/io/spec.py +++ b/negate/io/spec.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 # + import json import tomllib from dataclasses import dataclass @@ -17,6 +18,7 @@ NegateConfig, NegateDataPaths, NegateHyperParam, + NegateEnsembleConfig, NegateModelConfig, NegateTrainRounds, chip, @@ -27,6 +29,7 @@ negate_options, root_folder, train_rounds, + ensemble_config, ) @@ -93,6 +96,7 @@ def __init__( self, negate_options=negate_options, hyperparam_config=hyperparam_config, + ensemble_config=ensemble_config, data_paths=data_paths, model_config=model_config, chip=chip, @@ -106,6 +110,7 @@ def __init__( self.apply: dict[str, torch.device | torch.dtype] = {"device": self.device, "dtype": self.dtype} self.np_dtype: np.typing.DTypeLike = chip.np_dtype self.hyper_param: NegateHyperParam = hyperparam_config + self.ensemble: NegateEnsembleConfig = ensemble_config self.train_rounds: NegateTrainRounds = train_rounds self.models: list[str] = [repo for repo in model_config.list_models] self.model = model_config.auto_model[0] diff --git a/tests/generate_results_pdf.py b/negate/metrics/pdf.py similarity index 57% rename from tests/generate_results_pdf.py rename to negate/metrics/pdf.py index 1ed4fdd..472f514 100644 --- a/tests/generate_results_pdf.py +++ b/negate/metrics/pdf.py @@ -1,169 +1,29 @@ # SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 # -"""Generate results PDF with multi-signal ensemble, calibrated thresholds, -abstention, and full precision/recall/F1 reporting. +from pathlib import Path -Usage: uv run python tests/generate_results_pdf.py -Output: results/artwork_detection_results.pdf -""" -from __future__ import annotations +def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, model, imgs_human, imgs_ai): -import sys -from datetime import datetime -from pathlib import Path + import numpy as np + import matplotlib + + matplotlib.use("Agg") + import matplotlib.pyplot as plt + import matplotlib.gridspec as gridspec + from matplotlib.backends.backend_pdf import PdfPages + from matplotlib.patches import Patch + from datetime import datetime + from sklearn.metrics import ( + roc_curve, + confusion_matrix, + precision_recall_curve, + ) + + OUTPUT_DIR: Path = Path(__file__).parent.parent / "results" + N_FOLDS = 5 -import matplotlib -matplotlib.use("Agg") -import matplotlib.pyplot as plt -import matplotlib.gridspec as gridspec -from matplotlib.backends.backend_pdf import PdfPages -from matplotlib.patches import Patch -import numpy as np -import pandas as pd -import xgboost as xgb -from datasets import load_dataset, Image as HFImage -from sklearn.calibration import CalibratedClassifierCV -from sklearn.metrics import ( - accuracy_score, precision_score, recall_score, f1_score, - roc_auc_score, roc_curve, confusion_matrix, precision_recall_curve, -) -from sklearn.model_selection import StratifiedKFold, cross_val_predict -from sklearn.neural_network import MLPClassifier -from sklearn.preprocessing import StandardScaler -from sklearn.svm import SVC -from tqdm import tqdm - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from negate.extract.feature_artwork import ArtworkExtract - -HUMAN_ART_REPO = "huggan/wikiart" -SYNTHETIC_REPO = "exdysa/nano-banana-pro-generated-1k-clone" -SAMPLE_SIZE = 100 -N_FOLDS = 5 -SEED = 42 -OUTPUT_DIR = Path(__file__).parent.parent / "results" - - -def load_and_extract(): - print(f"Loading {SAMPLE_SIZE} human art + {SAMPLE_SIZE} AI images...") - human_ds = load_dataset(HUMAN_ART_REPO, split=f"train[:{SAMPLE_SIZE}]") - human_ds = human_ds.cast_column("image", HFImage(decode=True, mode="RGB")) - ai_ds = load_dataset(SYNTHETIC_REPO, split=f"train[:{SAMPLE_SIZE}]") - ai_ds = ai_ds.cast_column("image", HFImage(decode=True, mode="RGB")) - - extractor = ArtworkExtract() - features, labels = [], [] - imgs_human, imgs_ai = [], [] - - for row in tqdm(human_ds, desc="Human art"): - try: - features.append(extractor(row["image"])) - labels.append(0) - if len(imgs_human) < 4: - imgs_human.append(row["image"]) - except Exception: - pass - - for row in tqdm(ai_ds, desc="AI art"): - try: - features.append(extractor(row["image"])) - labels.append(1) - if len(imgs_ai) < 4: - imgs_ai.append(row["image"]) - except Exception: - pass - - df = pd.DataFrame(features).fillna(0) - X = np.where(np.isfinite(df.to_numpy(dtype=np.float64)), df.to_numpy(dtype=np.float64), 0) - y = np.array(labels) - return X, y, list(df.columns), imgs_human, imgs_ai - - -def run_ensemble_cv(X, y): - """Run calibrated ensemble with abstention.""" - scaler = StandardScaler() - X_s = scaler.fit_transform(X) - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - - # Individual models (calibrated with Platt scaling) - models = { - "SVM": CalibratedClassifierCV(SVC(C=10, gamma="scale", kernel="rbf", random_state=SEED), cv=3, method="sigmoid"), - "MLP": CalibratedClassifierCV(MLPClassifier(hidden_layer_sizes=(100,), activation="relu", max_iter=1000, random_state=SEED), cv=3, method="sigmoid"), - } - - # Collect per-model CV predictions - model_probs = {} - model_preds = {} - for name, model in models.items(): - probs = cross_val_predict(model, X_s, y, cv=skf, method="predict_proba")[:, 1] - model_probs[name] = probs - model_preds[name] = (probs > 0.5).astype(int) - - # XGBoost (already outputs calibrated probabilities) - xgb_probs = np.zeros(len(y)) - for train_idx, test_idx in skf.split(X_s, y): - params = {"objective": "binary:logistic", "max_depth": 4, "learning_rate": 0.1, - "subsample": 0.8, "colsample_bytree": 0.8, "seed": SEED, "eval_metric": "logloss"} - dtrain = xgb.DMatrix(X_s[train_idx], label=y[train_idx]) - dtest = xgb.DMatrix(X_s[test_idx]) - model = xgb.train(params, dtrain, num_boost_round=200, - evals=[(xgb.DMatrix(X_s[test_idx], label=y[test_idx]), "test")], - early_stopping_rounds=10, verbose_eval=False) - xgb_probs[test_idx] = model.predict(dtest) - - model_probs["XGBoost"] = xgb_probs - model_preds["XGBoost"] = (xgb_probs > 0.5).astype(int) - - # Ensemble: average calibrated probabilities - ensemble_probs = np.mean([model_probs[n] for n in model_probs], axis=0) - - # Abstention: if ensemble confidence < threshold, mark as uncertain - ABSTAIN_THRESH = 0.3 # abstain if prob between 0.3 and 0.7 - ensemble_preds = np.full(len(y), -1) # -1 = uncertain - ensemble_preds[ensemble_probs > (1 - ABSTAIN_THRESH)] = 1 # AI - ensemble_preds[ensemble_probs < ABSTAIN_THRESH] = 0 # Human - - # Per-model metrics - results = {} - for name in model_probs: - pred = model_preds[name] - results[name] = { - "accuracy": accuracy_score(y, pred), - "precision": precision_score(y, pred, zero_division=0), - "recall": recall_score(y, pred, zero_division=0), - "f1": f1_score(y, pred, average="macro"), - "roc_auc": roc_auc_score(y, model_probs[name]), - "probs": model_probs[name], - } - - # Ensemble metrics (excluding abstained samples) - confident_mask = ensemble_preds >= 0 - n_abstained = int((~confident_mask).sum()) - if confident_mask.sum() > 0: - results["Ensemble"] = { - "accuracy": accuracy_score(y[confident_mask], ensemble_preds[confident_mask]), - "precision": precision_score(y[confident_mask], ensemble_preds[confident_mask], zero_division=0), - "recall": recall_score(y[confident_mask], ensemble_preds[confident_mask], zero_division=0), - "f1": f1_score(y[confident_mask], ensemble_preds[confident_mask], average="macro"), - "roc_auc": roc_auc_score(y, ensemble_probs), - "probs": ensemble_probs, - "n_abstained": n_abstained, - "n_classified": int(confident_mask.sum()), - } - - # Feature importance (full XGBoost model) - feature_names = [f"f{i}" for i in range(X.shape[1])] - dtrain_full = xgb.DMatrix(X_s, label=y, feature_names=feature_names) - full_model = xgb.train({"objective": "binary:logistic", "max_depth": 4, "seed": SEED}, - dtrain_full, num_boost_round=100, verbose_eval=False) - - return results, ensemble_probs, ensemble_preds, full_model - - -def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, - model, imgs_human, imgs_ai): OUTPUT_DIR.mkdir(exist_ok=True) pdf_path = OUTPUT_DIR / "artwork_detection_results.pdf" @@ -174,12 +34,9 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, ax = fig.add_axes([0, 0, 1, 1]) ax.axis("off") - ax.text(0.5, 0.92, "AI-Generated Artwork Detection", fontsize=22, fontweight="bold", - ha="center", fontfamily="serif", transform=ax.transAxes) - ax.text(0.5, 0.87, "Multi-Signal Ensemble with Calibrated Thresholds", - fontsize=12, ha="center", fontfamily="serif", style="italic", transform=ax.transAxes) - ax.text(0.5, 0.83, f"negate project | {datetime.now().strftime('%B %d, %Y')}", - fontsize=10, ha="center", fontfamily="serif", transform=ax.transAxes) + ax.text(0.5, 0.92, "AI-Generated Artwork Detection", fontsize=22, fontweight="bold", ha="center", fontfamily="serif", transform=ax.transAxes) + ax.text(0.5, 0.87, "Multi-Signal Ensemble with Calibrated Thresholds", fontsize=12, ha="center", fontfamily="serif", style="italic", transform=ax.transAxes) + ax.text(0.5, 0.83, f"negate project | {datetime.now().strftime('%B %d, %Y')}", fontsize=10, ha="center", fontfamily="serif", transform=ax.transAxes) # Results table ax_table = fig.add_axes([0.08, 0.52, 0.84, 0.26]) @@ -187,8 +44,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, table_data = [] for name, r in results.items(): - row = [name, f"{r['accuracy']:.1%}", f"{r['precision']:.1%}", - f"{r['recall']:.1%}", f"{r['f1']:.1%}", f"{r['roc_auc']:.4f}"] + row = [name, f"{r['accuracy']:.1%}", f"{r['precision']:.1%}", f"{r['recall']:.1%}", f"{r['f1']:.1%}", f"{r['roc_auc']:.4f}"] if name == "Ensemble": row.append(f"{r['n_abstained']}") else: @@ -199,7 +55,8 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, table = ax_table.table( cellText=table_data, colLabels=["Model", "Accuracy", "Precision", "Recall", "F1", "AUC", "Abstained"], - loc="center", cellLoc="center", + loc="center", + cellLoc="center", ) table.auto_set_font_size(False) table.set_fontsize(8.5) @@ -216,7 +73,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, summary = ( "Approach\n\n" f" Features: {X.shape[1]} (39 artwork + 10 frequency analysis)\n" - f" Dataset: {np.sum(y==0)} human artworks (WikiArt) + {np.sum(y==1)} AI images\n" + f" Dataset: {np.sum(y == 0)} human artworks (WikiArt) + {np.sum(y == 1)} AI images\n" f" CV: {N_FOLDS}-fold stratified cross-validation\n\n" " Three calibrated classifiers (SVM, MLP, XGBoost) vote via averaged\n" " probabilities. Images where ensemble confidence is between 30-70%\n" @@ -224,8 +81,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, " Precision = of images flagged AI, how many actually are\n" " Recall = of actual AI images, how many were caught" ) - ax.text(0.08, 0.48, summary, fontsize=9, ha="left", va="top", fontfamily="serif", - transform=ax.transAxes) + ax.text(0.08, 0.48, summary, fontsize=9, ha="left", va="top", fontfamily="serif", transform=ax.transAxes) # Key findings findings = ( @@ -234,14 +90,22 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, f"{ens.get('recall', 0):.1%} recall on classified images\n" f" 2. {ens.get('n_abstained', 0)} uncertain images abstained from " f"(reduces false positives)\n" - f" 3. +{(ens.get('accuracy', 0) - 0.633)*100:.1f}pp improvement over " + f" 3. +{(ens.get('accuracy', 0) - 0.633) * 100:.1f}pp improvement over " "existing negate pipeline (63.3%)\n" f" 4. Frequency features (FFT/DCT) add spectral artifact detection\n" " 5. All processing is CPU-only, ~12 images/sec" ) - ax.text(0.08, 0.24, findings, fontsize=9, ha="left", va="top", fontfamily="serif", - transform=ax.transAxes, - bbox=dict(boxstyle="round,pad=0.4", facecolor="#E8F5E9", edgecolor="#66BB6A")) + ax.text( + 0.08, + 0.24, + findings, + fontsize=9, + ha="left", + va="top", + fontfamily="serif", + transform=ax.transAxes, + bbox=dict(boxstyle="round,pad=0.4", facecolor="#E8F5E9", edgecolor="#66BB6A"), + ) pdf.savefig(fig) plt.close(fig) @@ -249,16 +113,14 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, # ===== PAGE 2: ROC + PR curves + Confusion Matrix ===== fig = plt.figure(figsize=(8.5, 11)) fig.patch.set_facecolor("white") - fig.suptitle("Detection Performance Analysis", fontsize=14, - fontweight="bold", fontfamily="serif", y=0.96) + fig.suptitle("Detection Performance Analysis", fontsize=14, fontweight="bold", fontfamily="serif", y=0.96) # ROC curves ax_roc = fig.add_axes([0.08, 0.62, 0.4, 0.28]) colors = {"SVM": "#4472C4", "MLP": "#ED7D31", "XGBoost": "#70AD47", "Ensemble": "#C00000"} for name, r in results.items(): fpr, tpr, _ = roc_curve(y, r["probs"]) - ax_roc.plot(fpr, tpr, color=colors.get(name, "gray"), linewidth=2, - label=f"{name} ({r['roc_auc']:.3f})") + ax_roc.plot(fpr, tpr, color=colors.get(name, "gray"), linewidth=2, label=f"{name} ({r['roc_auc']:.3f})") ax_roc.plot([0, 1], [0, 1], "k--", alpha=0.3) ax_roc.set_xlabel("False Positive Rate", fontsize=9) ax_roc.set_ylabel("True Positive Rate", fontsize=9) @@ -270,8 +132,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, ax_pr = fig.add_axes([0.55, 0.62, 0.4, 0.28]) for name, r in results.items(): prec_curve, rec_curve, _ = precision_recall_curve(y, r["probs"]) - ax_pr.plot(rec_curve, prec_curve, color=colors.get(name, "gray"), linewidth=2, - label=name) + ax_pr.plot(rec_curve, prec_curve, color=colors.get(name, "gray"), linewidth=2, label=name) ax_pr.set_xlabel("Recall", fontsize=9) ax_pr.set_ylabel("Precision", fontsize=9) ax_pr.set_title("Precision-Recall Curves", fontsize=10, fontfamily="serif") @@ -293,8 +154,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, ax_cm.set_title("Ensemble (confident only)", fontsize=10, fontfamily="serif") for i in range(2): for j in range(2): - ax_cm.text(j, i, str(cm[i, j]), ha="center", va="center", fontsize=16, - fontweight="bold", color="white" if cm[i, j] > cm.max()/2 else "black") + ax_cm.text(j, i, str(cm[i, j]), ha="center", va="center", fontsize=16, fontweight="bold", color="white" if cm[i, j] > cm.max() / 2 else "black") # Probability distribution ax_hist = fig.add_axes([0.55, 0.28, 0.4, 0.26]) @@ -313,19 +173,17 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, # Per-model agreement analysis ax_agree = fig.add_axes([0.08, 0.04, 0.84, 0.18]) ax_agree.axis("off") - n_all_agree = sum(1 for i in range(len(y)) - if len(set(results[n]["probs"][i] > 0.5 for n in ["SVM", "MLP", "XGBoost"])) == 1) + n_all_agree = sum(1 for i in range(len(y)) if len(set(results[n]["probs"][i] > 0.5 for n in ["SVM", "MLP", "XGBoost"])) == 1) n_disagree = len(y) - n_all_agree agree_text = ( "Model Agreement Analysis\n\n" - f" All 3 models agree: {n_all_agree}/{len(y)} ({n_all_agree/len(y):.0%})\n" - f" At least 1 disagrees: {n_disagree}/{len(y)} ({n_disagree/len(y):.0%})\n\n" + f" All 3 models agree: {n_all_agree}/{len(y)} ({n_all_agree / len(y):.0%})\n" + f" At least 1 disagrees: {n_disagree}/{len(y)} ({n_disagree / len(y):.0%})\n\n" " When models disagree, the ensemble uses averaged probability with\n" " abstention zone (0.3-0.7). This reduces false positives at the cost\n" " of some unclassified images -- a deliberate tradeoff for precision." ) - ax_agree.text(0, 1, agree_text, fontsize=9, ha="left", va="top", fontfamily="serif", - transform=ax_agree.transAxes) + ax_agree.text(0, 1, agree_text, fontsize=9, ha="left", va="top", fontfamily="serif", transform=ax_agree.transAxes) pdf.savefig(fig) plt.close(fig) @@ -333,8 +191,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, # ===== PAGE 3: Feature Analysis + Examples ===== fig = plt.figure(figsize=(8.5, 11)) fig.patch.set_facecolor("white") - fig.suptitle("Feature Analysis & Examples", fontsize=14, - fontweight="bold", fontfamily="serif", y=0.96) + fig.suptitle("Feature Analysis & Examples", fontsize=14, fontweight="bold", fontfamily="serif", y=0.96) # Example images n = min(4, len(imgs_human), len(imgs_ai)) @@ -342,12 +199,12 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, for i in range(n): ax = fig.add_subplot(gs[0, i]) ax.imshow(imgs_human[i]) - ax.set_title(f"Human #{i+1}", fontsize=8) + ax.set_title(f"Human #{i + 1}", fontsize=8) ax.axis("off") for i in range(n): ax = fig.add_subplot(gs[1, i]) ax.imshow(imgs_ai[i]) - ax.set_title(f"AI #{i+1}", fontsize=8) + ax.set_title(f"AI #{i + 1}", fontsize=8) ax.axis("off") # Feature importance @@ -363,10 +220,17 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, disp_names = disp_names[::-1] gains = [x[1] for x in sorted_imp][::-1] - color_map = {"fft": "#C00000", "dct": "#C00000", - "hog": "#ED7D31", "edge": "#ED7D31", - "lbp": "#70AD47", "contrast": "#70AD47", "correlation": "#70AD47", - "energy": "#70AD47", "homogeneity": "#70AD47"} + color_map = { + "fft": "#C00000", + "dct": "#C00000", + "hog": "#ED7D31", + "edge": "#ED7D31", + "lbp": "#70AD47", + "contrast": "#70AD47", + "correlation": "#70AD47", + "energy": "#70AD47", + "homogeneity": "#70AD47", + } bar_colors = [] for n in disp_names: c = "#4472C4" # default @@ -399,8 +263,7 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, ax = fig.add_axes([0, 0, 1, 1]) ax.axis("off") - ax.text(0.5, 0.95, "Architecture & Methodology", fontsize=14, - fontweight="bold", ha="center", fontfamily="serif", transform=ax.transAxes) + ax.text(0.5, 0.95, "Architecture & Methodology", fontsize=14, fontweight="bold", ha="center", fontfamily="serif", transform=ax.transAxes) method_text = ( "Multi-Signal Ensemble Architecture\n\n" @@ -440,36 +303,10 @@ def generate_pdf(X, y, feature_names, results, ensemble_probs, ensemble_preds, " [1] Li & Stamp, 'Detecting AI-generated Artwork', arXiv:2504.07078, 2025\n" " [2] negate project, github.com/darkshapes/negate" ) - ax.text(0.06, 0.9, method_text, fontsize=8.5, ha="left", va="top", fontfamily="serif", - transform=ax.transAxes) + ax.text(0.06, 0.9, method_text, fontsize=8.5, ha="left", va="top", fontfamily="serif", transform=ax.transAxes) pdf.savefig(fig) plt.close(fig) print(f"PDF saved: {pdf_path}") return pdf_path - - -def main(): - print("=" * 55) - print(" ARTWORK DETECTION - ENSEMBLE RESULTS") - print("=" * 55) - - X, y, names, imgs_h, imgs_a = load_and_extract() - print(f"Dataset: {np.sum(y==0)} human + {np.sum(y==1)} AI, {X.shape[1]} features") - - results, ens_probs, ens_preds, model = run_ensemble_cv(X, y) - - print(f"\n{'Model':<15} {'Acc':>8} {'Prec':>8} {'Rec':>8} {'F1':>8} {'AUC':>8}") - print("-" * 55) - for name, r in results.items(): - extra = f" ({r.get('n_abstained', '-')} abstained)" if 'n_abstained' in r else "" - print(f"{name:<15} {r['accuracy']:>7.1%} {r['precision']:>7.1%} {r['recall']:>7.1%} " - f"{r['f1']:>7.1%} {r['roc_auc']:>7.4f}{extra}") - - generate_pdf(X, y, names, results, ens_probs, ens_preds, model, imgs_h, imgs_a) - print("Done.") - - -if __name__ == "__main__": - main() diff --git a/results/20260403_224754/results_real_20260403_224754.json b/results/20260403_224754/results_real_20260403_224754.json new file mode 100644 index 0000000..cec4f95 --- /dev/null +++ b/results/20260403_224754/results_real_20260403_224754.json @@ -0,0 +1,3 @@ +{ + "x_p": "{'image_mean_ff': 0.5551752934617227, 'image_std': 0.0977445244532872, 'image_mean': (28894107043657, 57788214087314), 'diff_mean': (25353770906193, 50707541812387), 'laplace_mean': (24083985868529, 48167971737058), 'sobel_mean': (26436775610593, 52873551221187), 'image_tc': (16, 16), 'diff_tc': (29, 29), 'laplace_tc': (25, 25), 'sobel_tc': (25, 25), 'spectral_tc': (16, 16)}" +} \ No newline at end of file diff --git a/results/20260403_224959/results_real_20260403_224959.json b/results/20260403_224959/results_real_20260403_224959.json new file mode 100644 index 0000000..cec4f95 --- /dev/null +++ b/results/20260403_224959/results_real_20260403_224959.json @@ -0,0 +1,3 @@ +{ + "x_p": "{'image_mean_ff': 0.5551752934617227, 'image_std': 0.0977445244532872, 'image_mean': (28894107043657, 57788214087314), 'diff_mean': (25353770906193, 50707541812387), 'laplace_mean': (24083985868529, 48167971737058), 'sobel_mean': (26436775610593, 52873551221187), 'image_tc': (16, 16), 'diff_tc': (29, 29), 'laplace_tc': (25, 25), 'sobel_tc': (25, 25), 'spectral_tc': (16, 16)}" +} \ No newline at end of file diff --git a/results/20260403_225042/results_real_20260403_225042.json b/results/20260403_225042/results_real_20260403_225042.json new file mode 100644 index 0000000..cec4f95 --- /dev/null +++ b/results/20260403_225042/results_real_20260403_225042.json @@ -0,0 +1,3 @@ +{ + "x_p": "{'image_mean_ff': 0.5551752934617227, 'image_std': 0.0977445244532872, 'image_mean': (28894107043657, 57788214087314), 'diff_mean': (25353770906193, 50707541812387), 'laplace_mean': (24083985868529, 48167971737058), 'sobel_mean': (26436775610593, 52873551221187), 'image_tc': (16, 16), 'diff_tc': (29, 29), 'laplace_tc': (25, 25), 'sobel_tc': (25, 25), 'spectral_tc': (16, 16)}" +} \ No newline at end of file diff --git a/tests/generate_fair_eval_pdf.py b/tests/generate_fair_eval_pdf.py deleted file mode 100644 index cbe4af4..0000000 --- a/tests/generate_fair_eval_pdf.py +++ /dev/null @@ -1,262 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -"""Generate PDF report for fair evaluation results. - -Reads results/fair_evaluation_results.json and generates a timestamped PDF -with cross-validation metrics, comparison tables, and analysis. -""" - -from __future__ import annotations - -import json -import sys -from datetime import datetime -from pathlib import Path - -import matplotlib -matplotlib.use("Agg") -import matplotlib.pyplot as plt -from matplotlib.backends.backend_pdf import PdfPages -import matplotlib.gridspec as gridspec -import numpy as np - -RESULTS_DIR = Path(__file__).parent.parent / "results" - - -def generate_pdf(results_path: Path): - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - pdf_path = RESULTS_DIR / f"fair_evaluation_{timestamp}.pdf" - - with open(results_path) as f: - data = json.load(f) - - datasets = data["datasets"] - - with PdfPages(str(pdf_path)) as pdf: - # ===== PAGE 1: Title & Summary ===== - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - ax = fig.add_axes([0, 0, 1, 1]) - ax.axis("off") - - ax.text(0.5, 0.88, "Fair Evaluation Report:\n49-Feature Artwork Detection", - transform=ax.transAxes, fontsize=20, fontweight="bold", - ha="center", va="top", fontfamily="serif") - - ax.text(0.5, 0.74, f"negate project — darkshapes\n{datetime.now().strftime('%B %d, %Y')}", - transform=ax.transAxes, fontsize=11, ha="center", va="top", - fontfamily="serif", style="italic") - - # Why this evaluation matters - rationale = ( - "Why This Evaluation Matters\n\n" - "Previous benchmarks used datasets where AI and genuine images had different\n" - "subject matter (cats vs bananas, WikiArt paintings vs generated illustrations).\n" - "This means the classifier could achieve high accuracy by learning content\n" - "differences rather than genuine AI artifacts.\n\n" - "This evaluation uses datasets where BOTH classes contain similar content:\n" - " - Hemg: 'AiArtData' vs 'RealArt' — both are artwork/art images\n" - " - Parveshiiii: balanced binary AI vs Real images\n\n" - "If our 49 features still achieve high accuracy on these datasets, it provides\n" - "stronger evidence that the features detect actual AI generation artifacts\n" - "rather than subject-matter shortcuts." - ) - ax.text(0.08, 0.64, rationale, transform=ax.transAxes, fontsize=9, - ha="left", va="top", fontfamily="serif", - bbox=dict(boxstyle="round,pad=0.5", facecolor="lightyellow", edgecolor="gray")) - - # Summary table - summary = "Results Summary\n\n" - for ds in datasets: - summary += ( - f"Dataset: {ds['dataset']}\n" - f" Samples: {ds['n_samples']} ({ds['n_samples']//2} per class)\n" - f" XGBoost: {ds['xgb_accuracy']:.1%} acc, {ds['xgb_auc']:.4f} AUC, " - f"{ds['xgb_precision']:.1%} prec, {ds['xgb_recall']:.1%} rec\n" - f" SVM: {ds['svm_accuracy']:.1%} acc, {ds['svm_auc']:.4f} AUC\n" - f" MLP: {ds['mlp_accuracy']:.1%} acc, {ds['mlp_auc']:.4f} AUC\n\n" - ) - ax.text(0.08, 0.28, summary, transform=ax.transAxes, fontsize=9, - ha="left", va="top", fontfamily="serif", - bbox=dict(boxstyle="round,pad=0.4", facecolor="#E8F5E9", edgecolor="#66BB6A")) - - pdf.savefig(fig) - plt.close(fig) - - # ===== PAGE 2+: Per-dataset details ===== - for ds in datasets: - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - fig.suptitle(f"Dataset: {ds['dataset']}", fontsize=14, - fontweight="bold", fontfamily="serif", y=0.96) - - # Fold results table - ax_table = fig.add_axes([0.1, 0.68, 0.8, 0.22]) - ax_table.axis("off") - - if "xgb_folds" in ds: - table_data = [] - for r in ds["xgb_folds"]: - table_data.append([ - f"Fold {r['fold']}", f"{r['accuracy']:.2%}", - f"{r['precision']:.2%}", f"{r['recall']:.2%}", - f"{r['f1']:.2%}", f"{r['roc_auc']:.4f}" - ]) - - accs = [r["accuracy"] for r in ds["xgb_folds"]] - table_data.append([ - "Mean +/- Std", - f"{np.mean(accs):.2%} +/- {np.std(accs):.2%}", - "-", "-", "-", - f"{np.mean([r['roc_auc'] for r in ds['xgb_folds']]):.4f}" - ]) - - table = ax_table.table( - cellText=table_data, - colLabels=["Fold", "Accuracy", "Precision", "Recall", "F1", "ROC-AUC"], - loc="center", cellLoc="center", - ) - table.auto_set_font_size(False) - table.set_fontsize(8) - table.scale(1, 1.4) - for (row, col), cell in table.get_celld().items(): - if row == 0: - cell.set_facecolor("#4472C4") - cell.set_text_props(color="white", fontweight="bold") - elif row == len(table_data): - cell.set_facecolor("#D6E4F0") - - # Comparison bar chart: XGBoost vs SVM vs MLP - ax_bar = fig.add_axes([0.1, 0.35, 0.8, 0.25]) - models = ["XGBoost", "SVM", "MLP"] - accs = [ds["xgb_accuracy"], ds["svm_accuracy"], ds["mlp_accuracy"]] - aucs = [ds["xgb_auc"], ds["svm_auc"], ds["mlp_auc"]] - - x = np.arange(len(models)) - w = 0.35 - bars1 = ax_bar.bar(x - w/2, accs, w, label="Accuracy", color="#4472C4") - bars2 = ax_bar.bar(x + w/2, aucs, w, label="ROC-AUC", color="#ED7D31") - ax_bar.set_xticks(x) - ax_bar.set_xticklabels(models) - ax_bar.set_ylim(0, 1.1) - ax_bar.set_ylabel("Score") - ax_bar.set_title("Model Comparison", fontsize=11, fontfamily="serif") - ax_bar.legend() - ax_bar.grid(axis="y", alpha=0.3) - - for bar in bars1: - ax_bar.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02, - f"{bar.get_height():.1%}", ha="center", fontsize=8) - for bar in bars2: - ax_bar.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02, - f"{bar.get_height():.3f}", ha="center", fontsize=8) - - # Analysis text - ax_text = fig.add_axes([0.08, 0.05, 0.84, 0.25]) - ax_text.axis("off") - - best_acc = max(accs) - best_model = models[accs.index(best_acc)] - - analysis = ( - f"Analysis\n\n" - f"Dataset: {ds['repo']}\n" - f"Sample size: {ds['n_samples']} images, {ds['n_features']} features\n\n" - f"Best model: {best_model} at {best_acc:.1%} accuracy\n\n" - ) - if best_acc >= 0.80: - analysis += ( - "The features demonstrate strong discriminative power even when both\n" - "classes contain similar content. This suggests the 49 features capture\n" - "genuine AI generation artifacts rather than content-based shortcuts." - ) - elif best_acc >= 0.65: - analysis += ( - "Moderate discriminative power. The features capture some genuine AI\n" - "artifacts but performance degrades compared to content-separated datasets,\n" - "suggesting prior benchmarks partially relied on content differences." - ) - else: - analysis += ( - "Weak discriminative power on this dataset. The features struggle when\n" - "content is controlled, indicating prior high accuracy was largely driven\n" - "by subject-matter differences rather than AI detection capability." - ) - - ax_text.text(0, 1, analysis, transform=ax_text.transAxes, fontsize=9, - ha="left", va="top", fontfamily="serif") - - pdf.savefig(fig) - plt.close(fig) - - # ===== FINAL PAGE: Conclusions ===== - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - ax = fig.add_axes([0, 0, 1, 1]) - ax.axis("off") - - ax.text(0.5, 0.92, "Conclusions", fontsize=16, fontweight="bold", - ha="center", va="top", fontfamily="serif", transform=ax.transAxes) - - all_accs = [ds["xgb_accuracy"] for ds in datasets] - mean_fair_acc = np.mean(all_accs) - - conclusions = ( - f"Mean XGBoost accuracy across fair datasets: {mean_fair_acc:.1%}\n\n" - "Comparison with previous (potentially confounded) benchmarks:\n" - " - Cats vs Bananas (unfair): ~91% accuracy\n" - " - WikiArt vs Generated (partially fair): ~92% accuracy\n" - f" - Fair evaluation (this report): {mean_fair_acc:.1%} accuracy\n\n" - ) - - if mean_fair_acc >= 0.80: - conclusions += ( - "CONCLUSION: The 49-feature pipeline holds up under fair evaluation.\n" - "The accuracy drop from unfair to fair benchmarks is modest, indicating\n" - "that the features genuinely detect AI artifacts, not just content.\n\n" - "The frequency-domain features (FFT/DCT) and texture features (GLCM/LBP)\n" - "appear to be capturing real structural differences between AI-generated\n" - "and human-created artwork." - ) - elif mean_fair_acc >= 0.65: - conclusions += ( - "CONCLUSION: Mixed results. The features have some genuine detection\n" - "capability but a significant portion of previous accuracy was from\n" - "content shortcuts. The pipeline needs improvement — likely deeper\n" - "learned features (self-supervised or fine-tuned ViT) rather than\n" - "hand-crafted statistics." - ) - else: - conclusions += ( - "CONCLUSION: The 49-feature pipeline does NOT generalize to fair\n" - "evaluation. Previous high accuracy was primarily from content confounds.\n" - "A fundamentally different approach is needed — likely self-supervised\n" - "learning of camera/generation-intrinsic features as described in\n" - "Zhong et al. (2026)." - ) - - conclusions += ( - "\n\nMethodological Note\n\n" - "This report uses 5-fold stratified cross-validation with 200 images per\n" - "class. While larger samples would give tighter confidence intervals, this\n" - "is sufficient to distinguish between >80% and chance-level performance.\n\n" - "Features: 49 total (39 from Li & Stamp 2025 + 10 FFT/DCT frequency features)\n" - "Classifiers: XGBoost, SVM (RBF kernel), MLP (100 hidden units)\n" - "All processing: CPU-only, no pretrained neural networks" - ) - - ax.text(0.08, 0.85, conclusions, transform=ax.transAxes, fontsize=9.5, - ha="left", va="top", fontfamily="serif") - - pdf.savefig(fig) - plt.close(fig) - - print(f"PDF saved to: {pdf_path}") - return pdf_path - - -if __name__ == "__main__": - results_path = RESULTS_DIR / "fair_evaluation_results.json" - if not results_path.exists(): - print(f"Run test_fair_evaluation.py first to generate {results_path}") - sys.exit(1) - generate_pdf(results_path) diff --git a/tests/test_artwork_accuracy.py b/tests/test_artwork_accuracy.py deleted file mode 100644 index 75ba27a..0000000 --- a/tests/test_artwork_accuracy.py +++ /dev/null @@ -1,210 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -# - -"""End-to-end accuracy benchmark using the 39-feature artwork extraction -pipeline from Li & Stamp (2025) "Detecting AI-generated Artwork". - -Downloads human artwork from WikiArt and AI-generated images, extracts -39 features (brightness, color, texture, shape, noise), trains SVM/MLP/XGBoost, -reports accuracy with 5-fold cross-validation. - -Run with: uv run pytest tests/test_artwork_accuracy.py -v -s -""" - -from __future__ import annotations - -import numpy as np -import pandas as pd -import pytest -import xgboost as xgb -from datasets import load_dataset, Image as HFImage -from PIL import Image -from sklearn.decomposition import PCA -from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, precision_score, recall_score -from sklearn.model_selection import StratifiedKFold, cross_val_score -from sklearn.neural_network import MLPClassifier -from sklearn.preprocessing import StandardScaler -from sklearn.svm import SVC -from tqdm import tqdm - -from negate.extract.feature_artwork import ArtworkExtract - -# Datasets -HUMAN_ART_REPO = "huggan/wikiart" # Human artwork (has style labels) -SYNTHETIC_REPO = "exdysa/nano-banana-pro-generated-1k-clone" # AI-generated -SAMPLE_SIZE = 100 # per class -N_FOLDS = 5 -SEED = 42 - - -@pytest.fixture(scope="module") -def benchmark_data(): - """Download images and extract 39 features for both classes.""" - print(f"\nDownloading {SAMPLE_SIZE} human art + {SAMPLE_SIZE} AI images...") - - # Human artwork from WikiArt - human_ds = load_dataset(HUMAN_ART_REPO, split=f"train[:{SAMPLE_SIZE}]") - human_ds = human_ds.cast_column("image", HFImage(decode=True, mode="RGB")) - - # AI-generated images - ai_ds = load_dataset(SYNTHETIC_REPO, split=f"train[:{SAMPLE_SIZE}]") - ai_ds = ai_ds.cast_column("image", HFImage(decode=True, mode="RGB")) - - extractor = ArtworkExtract() - features, labels = [], [] - - print("Extracting features from human artwork...") - for row in tqdm(human_ds, total=len(human_ds), desc="Human art"): - try: - feat = extractor(row["image"]) - features.append(feat) - labels.append(0) # genuine - except Exception as exc: - print(f" Skip: {exc}") - - print("Extracting features from AI images...") - for row in tqdm(ai_ds, total=len(ai_ds), desc="AI art"): - try: - feat = extractor(row["image"]) - features.append(feat) - labels.append(1) # synthetic - except Exception as exc: - print(f" Skip: {exc}") - - df = pd.DataFrame(features).fillna(0) - X = df.to_numpy(dtype=np.float64) - X = np.where(np.isfinite(X), X, 0) - y = np.array(labels) - - return { - "X": X, "y": y, - "feature_names": list(df.columns), - "n_human": int(np.sum(y == 0)), - "n_ai": int(np.sum(y == 1)), - } - - -@pytest.mark.slow -class TestArtworkDetection: - """Benchmark the paper's 39-feature approach on artwork detection.""" - - def test_feature_extraction(self, benchmark_data): - """Verify features extracted from both classes.""" - print(f"\n--- Dataset ---") - print(f"Human art: {benchmark_data['n_human']}") - print(f"AI art: {benchmark_data['n_ai']}") - print(f"Features: {benchmark_data['X'].shape[1]}") - assert benchmark_data["n_human"] >= 50 - assert benchmark_data["n_ai"] >= 50 - assert benchmark_data["X"].shape[1] == 49 - - def test_svm_cross_validation(self, benchmark_data): - """SVM with RBF kernel — paper's best binary model (97.9% reported).""" - X, y = benchmark_data["X"], benchmark_data["y"] - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) - - svm = SVC(C=10, gamma="scale", kernel="rbf", random_state=SEED, probability=True) - scores = cross_val_score(svm, X_scaled, y, cv=N_FOLDS, scoring="accuracy") - - print(f"\n--- SVM (RBF) {N_FOLDS}-Fold CV ---") - for i, s in enumerate(scores): - print(f" Fold {i+1}: {s:.2%}") - print(f" Mean: {scores.mean():.2%} +/- {scores.std():.2%}") - print(f" Paper reports: 97.9% (SVM binary)") - - def test_mlp_cross_validation(self, benchmark_data): - """MLP — paper's best multiclass model (82% reported).""" - X, y = benchmark_data["X"], benchmark_data["y"] - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) - - mlp = MLPClassifier( - hidden_layer_sizes=(100,), activation="relu", alpha=0.0001, - solver="adam", max_iter=1000, random_state=SEED, - ) - scores = cross_val_score(mlp, X_scaled, y, cv=N_FOLDS, scoring="accuracy") - - print(f"\n--- MLP {N_FOLDS}-Fold CV ---") - for i, s in enumerate(scores): - print(f" Fold {i+1}: {s:.2%}") - print(f" Mean: {scores.mean():.2%} +/- {scores.std():.2%}") - print(f" Paper reports: 97.6% (MLP binary)") - - def test_xgboost_cross_validation(self, benchmark_data): - """XGBoost — negate's existing classifier, now with paper's features.""" - X, y = benchmark_data["X"], benchmark_data["y"] - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) - - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - fold_accs, fold_aucs, fold_prec, fold_rec = [], [], [], [] - - for fold, (train_idx, test_idx) in enumerate(skf.split(X_scaled, y)): - X_train, X_test = X_scaled[train_idx], X_scaled[test_idx] - y_train, y_test = y[train_idx], y[test_idx] - - params = { - "objective": "binary:logistic", - "eval_metric": "logloss", - "max_depth": 4, - "learning_rate": 0.1, - "subsample": 0.8, - "colsample_bytree": 0.8, - "seed": SEED, - } - dtrain = xgb.DMatrix(X_train, label=y_train) - dtest = xgb.DMatrix(X_test, label=y_test) - model = xgb.train(params, dtrain, num_boost_round=200, - evals=[(dtest, "test")], early_stopping_rounds=10, - verbose_eval=False) - - y_prob = model.predict(dtest) - y_pred = (y_prob > 0.5).astype(int) - fold_accs.append(accuracy_score(y_test, y_pred)) - fold_aucs.append(roc_auc_score(y_test, y_prob)) - fold_prec.append(precision_score(y_test, y_pred, zero_division=0)) - fold_rec.append(recall_score(y_test, y_pred, zero_division=0)) - - print(f"\n--- XGBoost {N_FOLDS}-Fold CV ---") - for i, (acc, auc, p, r) in enumerate(zip(fold_accs, fold_aucs, fold_prec, fold_rec)): - print(f" Fold {i+1}: acc={acc:.2%} prec={p:.2%} rec={r:.2%} auc={auc:.4f}") - print(f" Mean: acc={np.mean(fold_accs):.2%} prec={np.mean(fold_prec):.2%} rec={np.mean(fold_rec):.2%} auc={np.mean(fold_aucs):.4f}") - - def test_comparison_summary(self, benchmark_data): - """Print comparison table of all models with precision and recall.""" - X, y = benchmark_data["X"], benchmark_data["y"] - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - - model_results = {} - for name, clf in [ - ("SVM (RBF)", SVC(C=10, gamma="scale", kernel="rbf", random_state=SEED)), - ("MLP", MLPClassifier(hidden_layer_sizes=(100,), activation="relu", max_iter=1000, random_state=SEED)), - ]: - accs, precs, recs = [], [], [] - for train_idx, test_idx in skf.split(X_scaled, y): - clf_copy = type(clf)(**clf.get_params()) - clf_copy.fit(X_scaled[train_idx], y[train_idx]) - y_pred = clf_copy.predict(X_scaled[test_idx]) - accs.append(accuracy_score(y[test_idx], y_pred)) - precs.append(precision_score(y[test_idx], y_pred, zero_division=0)) - recs.append(recall_score(y[test_idx], y_pred, zero_division=0)) - model_results[name] = { - "acc": np.array(accs), "prec": np.array(precs), "rec": np.array(recs) - } - - print(f"\n{'='*75}") - print(f" ARTWORK DETECTION: MODEL COMPARISON") - print(f" 39 features (Li & Stamp 2025) | {len(y)} images") - print(f"{'='*75}") - print(f" {'Model':<15} {'Accuracy':>10} {'Precision':>11} {'Recall':>10} {'Paper Acc':>11}") - print(f" {'-'*57}") - for name, r in model_results.items(): - paper = {"SVM (RBF)": "97.9%", "MLP": "97.6%"}.get(name, "") - print(f" {name:<15} {r['acc'].mean():>9.2%} {r['prec'].mean():>10.2%} {r['rec'].mean():>9.2%} {paper:>11}") - print(f" {'Existing negate':<15} {'63.3%':>10} {'--':>11} {'--':>10} {'63.3%':>11}") - print(f"{'='*75}") - print(f"\n Precision = of images flagged as AI, how many actually are (false positive rate)") - print(f" Recall = of actual AI images, how many were caught (false negative rate)") diff --git a/tests/test_config.toml b/tests/test_config.toml index 969d6d0..619c7b8 100644 --- a/tests/test_config.toml +++ b/tests/test_config.toml @@ -56,3 +56,16 @@ n_components = 0.95 # Number of components for dimensionality reduction num_boost_round = 200 # Number of boosting rounds test_size = 0.2 # 80/20 training split default verbose_eval = 20 + +[ensemble] +sample_size = 100 +n_folds = 5 +abstain_threshold = 0.3 +svm_c = 10.0 +mlp_hidden_layers = 100 +mlp_activation = "relu" +mlp_max_iter = 1000 +cv = 3 +method = "sigmoid" +gamma = "scale" +kernel = "rbf" diff --git a/tests/test_experiments.py b/tests/test_experiments.py deleted file mode 100644 index f035c48..0000000 --- a/tests/test_experiments.py +++ /dev/null @@ -1,514 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -"""Run all feature experiments on Hemg art dataset and compare. - -Experiments: - 1. Artwork features only (49 features) — baseline - 2. Style features only (15 features) - 3. Artwork + Style combined (64 features) - 4. CLIP embeddings (768 features) - 5. CLIP + Artwork + Style (832 features) - -Each experiment: 4000 samples, 5-fold CV, XGBoost/SVM/MLP. -Generates a comparison PDF. -""" - -from __future__ import annotations - -import json -import sys -import time -from datetime import datetime -from pathlib import Path - -import matplotlib -matplotlib.use("Agg") -import matplotlib.pyplot as plt -from matplotlib.backends.backend_pdf import PdfPages -import matplotlib.gridspec as gridspec -import numpy as np -import pandas as pd -import torch -import xgboost as xgb -from datasets import load_dataset, Image as HFImage -from PIL import Image -from sklearn.metrics import ( - accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, - confusion_matrix, roc_curve, -) -from sklearn.model_selection import StratifiedKFold -from sklearn.neural_network import MLPClassifier -from sklearn.preprocessing import StandardScaler -from sklearn.svm import SVC -from tqdm import tqdm - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from negate.extract.feature_artwork import ArtworkExtract -from negate.extract.feature_style import StyleExtract - -SEED = 42 -N_FOLDS = 5 -N_PER_CLASS = 2000 -REPO = "Hemg/AI-Generated-vs-Real-Images-Datasets" -RESULTS_DIR = Path(__file__).parent.parent / "results" - - -def load_dataset_cached(): - """Load and return the Hemg dataset.""" - print("Loading Hemg dataset...") - ds = load_dataset(REPO, split="train") - ds = ds.cast_column("image", HFImage(decode=True, mode="RGB")) - return ds - - -def extract_artwork_features(ds, indices) -> np.ndarray: - """Extract 49 artwork features.""" - extractor = ArtworkExtract() - features = [] - for idx in tqdm(indices, desc=" Artwork features"): - try: - img = ds[int(idx)]["image"] - if img and isinstance(img, Image.Image): - features.append(extractor(img)) - else: - features.append(None) - except Exception: - features.append(None) - df = pd.DataFrame([f for f in features if f is not None]).fillna(0) - X = df.to_numpy(dtype=np.float64) - return np.where(np.isfinite(X), X, 0), list(df.columns), [i for i, f in enumerate(features) if f is not None] - - -def extract_style_features(ds, indices) -> np.ndarray: - """Extract 15 style features.""" - extractor = StyleExtract() - features = [] - for idx in tqdm(indices, desc=" Style features"): - try: - img = ds[int(idx)]["image"] - if img and isinstance(img, Image.Image): - features.append(extractor(img)) - else: - features.append(None) - except Exception: - features.append(None) - df = pd.DataFrame([f for f in features if f is not None]).fillna(0) - X = df.to_numpy(dtype=np.float64) - return np.where(np.isfinite(X), X, 0), list(df.columns), [i for i, f in enumerate(features) if f is not None] - - -def extract_clip_features(ds, indices) -> np.ndarray: - """Extract CLIP ViT-B/32 embeddings (512-d).""" - from transformers import CLIPProcessor, CLIPModel - - print(" Loading CLIP model...") - model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") - processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") - model.eval() - - device = "cuda" if torch.cuda.is_available() else "cpu" - model = model.to(device) - - features = [] - valid = [] - batch_size = 32 - - for batch_start in tqdm(range(0, len(indices), batch_size), desc=" CLIP features"): - batch_indices = indices[batch_start:batch_start + batch_size] - images = [] - batch_valid = [] - for i, idx in enumerate(batch_indices): - try: - img = ds[int(idx)]["image"] - if img and isinstance(img, Image.Image): - images.append(img) - batch_valid.append(batch_start + i) - except Exception: - pass - - if not images: - continue - - with torch.no_grad(): - inputs = processor(images=images, return_tensors="pt", padding=True).to(device) - outputs = model.get_image_features(**inputs) - if isinstance(outputs, torch.Tensor): - embeddings = outputs.cpu().numpy() - else: - embeddings = outputs.pooler_output.cpu().numpy() - - features.append(embeddings) - valid.extend(batch_valid) - - X = np.vstack(features) - return X, [f"clip_{i}" for i in range(X.shape[1])], valid - - -def run_cv(X, y, model_type="xgb"): - """5-fold CV, return metrics dict.""" - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - all_true, all_prob = [], [] - - for train_idx, test_idx in skf.split(X, y): - X_train, X_test = X[train_idx], X[test_idx] - y_train, y_test = y[train_idx], y[test_idx] - - if model_type == "xgb": - spw = np.sum(y_train == 0) / max(np.sum(y_train == 1), 1) - params = { - "objective": "binary:logistic", "eval_metric": "logloss", - "max_depth": 5, "learning_rate": 0.1, "subsample": 0.8, - "colsample_bytree": 0.8, "scale_pos_weight": spw, "seed": SEED, - } - dtrain = xgb.DMatrix(X_train, label=y_train) - dtest = xgb.DMatrix(X_test, label=y_test) - model = xgb.train(params, dtrain, num_boost_round=300, - evals=[(dtest, "test")], early_stopping_rounds=15, - verbose_eval=False) - y_prob = model.predict(dtest) - elif model_type == "svm": - scaler = StandardScaler() - X_tr = scaler.fit_transform(X_train) - X_te = scaler.transform(X_test) - svm = SVC(kernel="rbf", probability=True, random_state=SEED) - svm.fit(X_tr, y_train) - y_prob = svm.predict_proba(X_te)[:, 1] - elif model_type == "mlp": - scaler = StandardScaler() - X_tr = scaler.fit_transform(X_train) - X_te = scaler.transform(X_test) - mlp = MLPClassifier(hidden_layer_sizes=(256, 128), max_iter=1000, - random_state=SEED, early_stopping=True) - mlp.fit(X_tr, y_train) - y_prob = mlp.predict_proba(X_te)[:, 1] - - all_true.extend(y_test) - all_prob.extend(y_prob) - - y_true = np.array(all_true) - y_prob = np.array(all_prob) - y_pred = (y_prob > 0.5).astype(int) - - return { - "accuracy": float(accuracy_score(y_true, y_pred)), - "precision": float(precision_score(y_true, y_pred, zero_division=0)), - "recall": float(recall_score(y_true, y_pred, zero_division=0)), - "f1": float(f1_score(y_true, y_pred, average="macro")), - "roc_auc": float(roc_auc_score(y_true, y_prob)), - "y_true": y_true.tolist(), - "y_prob": y_prob.tolist(), - } - - -def generate_pdf(experiments): - """Generate comparison PDF.""" - RESULTS_DIR.mkdir(exist_ok=True) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - pdf_path = RESULTS_DIR / f"experiments_comparison_{timestamp}.pdf" - - with PdfPages(str(pdf_path)) as pdf: - # PAGE 1: Title + comparison chart - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - - fig.suptitle("Feature Experiment Comparison\nfor AI Artwork Detection", - fontsize=18, fontweight="bold", fontfamily="serif", y=0.96) - fig.text(0.5, 0.89, f"negate project — darkshapes — {datetime.now().strftime('%B %d, %Y')}", - fontsize=10, ha="center", fontfamily="serif", style="italic") - fig.text(0.5, 0.86, f"Dataset: Hemg AI-Art vs Real-Art | {N_PER_CLASS*2} samples | 5-fold CV", - fontsize=9, ha="center", fontfamily="serif") - - # Grouped bar chart: accuracy by experiment and model - ax = fig.add_axes([0.1, 0.45, 0.8, 0.35]) - - exp_names = [e["name"] for e in experiments] - n_exp = len(exp_names) - x = np.arange(n_exp) - w = 0.25 - - for i, (model, color) in enumerate([("xgb", "#4472C4"), ("svm", "#ED7D31"), ("mlp", "#70AD47")]): - accs = [e["results"][model]["accuracy"] for e in experiments] - bars = ax.bar(x + i * w - w, accs, w, label=model.upper(), color=color) - for bar in bars: - ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005, - f"{bar.get_height():.1%}", ha="center", fontsize=6.5, rotation=45) - - ax.set_xticks(x) - ax.set_xticklabels([e["short_name"] for e in experiments], fontsize=8, rotation=15, ha="right") - ax.set_ylabel("Accuracy", fontsize=10) - ax.set_title("Accuracy by Feature Set and Model", fontsize=12, fontfamily="serif") - ax.legend(fontsize=9) - ax.set_ylim(0.5, 1.0) - ax.grid(axis="y", alpha=0.3) - - # Summary table - ax_table = fig.add_axes([0.05, 0.08, 0.9, 0.3]) - ax_table.axis("off") - - table_data = [] - for e in experiments: - best_model = max(e["results"], key=lambda m: e["results"][m]["accuracy"]) - best = e["results"][best_model] - table_data.append([ - e["short_name"], - str(e["n_features"]), - f"{best['accuracy']:.2%}", - f"{best['precision']:.2%}", - f"{best['recall']:.2%}", - f"{best['roc_auc']:.4f}", - best_model.upper(), - e.get("extract_time", "?"), - ]) - - table = ax_table.table( - cellText=table_data, - colLabels=["Features", "Count", "Best Acc", "Prec", "Recall", "AUC", "Model", "Time"], - loc="center", cellLoc="center", - ) - table.auto_set_font_size(False) - table.set_fontsize(7.5) - table.scale(1, 1.5) - for (row, col), cell in table.get_celld().items(): - if row == 0: - cell.set_facecolor("#4472C4") - cell.set_text_props(color="white", fontweight="bold") - - pdf.savefig(fig) - plt.close(fig) - - # PAGE 2: ROC curves - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - fig.suptitle("ROC Curves by Experiment (Best Model)", fontsize=14, - fontweight="bold", fontfamily="serif", y=0.96) - - colors = ["#4472C4", "#ED7D31", "#70AD47", "#FFC000", "#9B59B6"] - ax = fig.add_axes([0.12, 0.5, 0.76, 0.38]) - - for i, e in enumerate(experiments): - best_model = max(e["results"], key=lambda m: e["results"][m]["roc_auc"]) - r = e["results"][best_model] - fpr, tpr, _ = roc_curve(r["y_true"], r["y_prob"]) - ax.plot(fpr, tpr, color=colors[i % len(colors)], linewidth=2, - label=f"{e['short_name']} (AUC={r['roc_auc']:.3f})") - - ax.plot([0, 1], [0, 1], "k--", alpha=0.3) - ax.set_xlabel("False Positive Rate") - ax.set_ylabel("True Positive Rate") - ax.legend(fontsize=8, loc="lower right") - ax.grid(True, alpha=0.3) - - # Analysis text - ax_text = fig.add_axes([0.08, 0.05, 0.84, 0.38]) - ax_text.axis("off") - - # Find best and worst - best_exp = max(experiments, key=lambda e: max(e["results"][m]["accuracy"] for m in e["results"])) - worst_exp = min(experiments, key=lambda e: max(e["results"][m]["accuracy"] for m in e["results"])) - best_acc = max(best_exp["results"][m]["accuracy"] for m in best_exp["results"]) - worst_acc = max(worst_exp["results"][m]["accuracy"] for m in worst_exp["results"]) - - analysis = ( - "Analysis\n\n" - f"Best performing: {best_exp['name']} at {best_acc:.1%}\n" - f"Worst performing: {worst_exp['name']} at {worst_acc:.1%}\n" - f"Improvement from best to worst: {(best_acc - worst_acc)*100:+.1f}pp\n\n" - ) - - # Check if CLIP exists - clip_exp = [e for e in experiments if "clip" in e["short_name"].lower()] - art_exp = [e for e in experiments if e["short_name"] == "Artwork (49)"] - - if clip_exp and art_exp: - clip_acc = max(clip_exp[0]["results"][m]["accuracy"] for m in clip_exp[0]["results"]) - art_acc = max(art_exp[0]["results"][m]["accuracy"] for m in art_exp[0]["results"]) - analysis += ( - f"CLIP vs hand-crafted: {clip_acc:.1%} vs {art_acc:.1%} " - f"({(clip_acc - art_acc)*100:+.1f}pp)\n" - ) - if clip_acc > art_acc + 0.03: - analysis += "Learned features significantly outperform hand-crafted features.\n" - elif clip_acc < art_acc - 0.03: - analysis += "Surprisingly, hand-crafted features outperform CLIP on this task.\n" - else: - analysis += "Learned and hand-crafted features perform similarly.\n" - - # Check if combined helps - combined_exp = [e for e in experiments if "+" in e["short_name"]] - if combined_exp: - comb_acc = max(combined_exp[-1]["results"][m]["accuracy"] for m in combined_exp[-1]["results"]) - analysis += ( - f"\nCombined features: {comb_acc:.1%}\n" - ) - if comb_acc > best_acc - 0.01: - analysis += "Combining features achieves the best overall performance.\n" - else: - analysis += "Combining features does not improve over the best individual set.\n" - - analysis += ( - "\nConclusions\n\n" - "This comparison tests whether:\n" - " 1. Style-specific craft features add signal beyond generic statistics\n" - " 2. Learned representations (CLIP) outperform hand-crafted features\n" - " 3. Combining multiple feature types improves detection\n\n" - "All experiments use the same dataset (Hemg AI Art vs Real Art),\n" - "same sample size, and same evaluation methodology.\n" - ) - - ax_text.text(0, 1, analysis, transform=ax_text.transAxes, fontsize=9, - ha="left", va="top", fontfamily="serif") - - pdf.savefig(fig) - plt.close(fig) - - print(f"PDF saved to: {pdf_path}") - return pdf_path - - -def main(): - print("=" * 60) - print(" FEATURE EXPERIMENTS COMPARISON") - print(" Dataset: Hemg AI Art vs Real Art") - print(f" Samples: {N_PER_CLASS * 2} ({N_PER_CLASS} per class)") - print("=" * 60) - - ds = load_dataset_cached() - all_labels = ds["label"] - - # Select balanced indices - rng = np.random.RandomState(SEED) - idx_0 = [i for i, l in enumerate(all_labels) if l == 0] - idx_1 = [i for i, l in enumerate(all_labels) if l == 1] - chosen_0 = rng.choice(idx_0, size=N_PER_CLASS, replace=False) - chosen_1 = rng.choice(idx_1, size=N_PER_CLASS, replace=False) - all_indices = np.concatenate([chosen_0, chosen_1]) - # Labels: 0=AI(synthetic), 1=Real(genuine) in dataset - # We want: 0=genuine, 1=synthetic - y = np.array([1] * N_PER_CLASS + [0] * N_PER_CLASS) - - experiments = [] - - # === Experiment 1: Artwork features (49) === - print("\n" + "=" * 50) - print(" Experiment 1: Artwork Features (49)") - print("=" * 50) - t0 = time.time() - X_art, art_names, art_valid = extract_artwork_features(ds, all_indices) - t_art = f"{time.time() - t0:.0f}s" - y_art = y[art_valid] - print(f" {X_art.shape[0]} images, {X_art.shape[1]} features, {t_art}") - - exp1 = {"name": "Artwork Features (Li & Stamp + FFT/DCT)", "short_name": "Artwork (49)", - "n_features": X_art.shape[1], "extract_time": t_art, "results": {}} - for model in ["xgb", "svm", "mlp"]: - print(f" {model.upper()}...") - exp1["results"][model] = run_cv(X_art, y_art, model) - print(f" acc={exp1['results'][model]['accuracy']:.2%}") - experiments.append(exp1) - - # === Experiment 2: Style features (15) === - print("\n" + "=" * 50) - print(" Experiment 2: Style Features (15)") - print("=" * 50) - t0 = time.time() - X_style, style_names, style_valid = extract_style_features(ds, all_indices) - t_style = f"{time.time() - t0:.0f}s" - y_style = y[style_valid] - print(f" {X_style.shape[0]} images, {X_style.shape[1]} features, {t_style}") - - exp2 = {"name": "Style Features (stroke/palette/composition/texture)", "short_name": "Style (15)", - "n_features": X_style.shape[1], "extract_time": t_style, "results": {}} - for model in ["xgb", "svm", "mlp"]: - print(f" {model.upper()}...") - exp2["results"][model] = run_cv(X_style, y_style, model) - print(f" acc={exp2['results'][model]['accuracy']:.2%}") - experiments.append(exp2) - - # === Experiment 3: Artwork + Style combined (64) === - print("\n" + "=" * 50) - print(" Experiment 3: Artwork + Style Combined (64)") - print("=" * 50) - # Align valid indices - common_valid = sorted(set(art_valid) & set(style_valid)) - art_mask = [art_valid.index(v) for v in common_valid] - style_mask = [style_valid.index(v) for v in common_valid] - X_combined = np.hstack([X_art[art_mask], X_style[style_mask]]) - y_combined = y[common_valid] - print(f" {X_combined.shape[0]} images, {X_combined.shape[1]} features") - - exp3 = {"name": "Artwork + Style Combined", "short_name": "Art+Style (64)", - "n_features": X_combined.shape[1], "extract_time": "combined", "results": {}} - for model in ["xgb", "svm", "mlp"]: - print(f" {model.upper()}...") - exp3["results"][model] = run_cv(X_combined, y_combined, model) - print(f" acc={exp3['results'][model]['accuracy']:.2%}") - experiments.append(exp3) - - # === Experiment 4: CLIP embeddings (512) === - print("\n" + "=" * 50) - print(" Experiment 4: CLIP ViT-B/32 Embeddings (512)") - print("=" * 50) - t0 = time.time() - X_clip, clip_names, clip_valid = extract_clip_features(ds, all_indices) - t_clip = f"{time.time() - t0:.0f}s" - y_clip = y[clip_valid] - print(f" {X_clip.shape[0]} images, {X_clip.shape[1]} features, {t_clip}") - - exp4 = {"name": "CLIP ViT-B/32 Embeddings", "short_name": "CLIP (512)", - "n_features": X_clip.shape[1], "extract_time": t_clip, "results": {}} - for model in ["xgb", "svm", "mlp"]: - print(f" {model.upper()}...") - exp4["results"][model] = run_cv(X_clip, y_clip, model) - print(f" acc={exp4['results'][model]['accuracy']:.2%}") - experiments.append(exp4) - - # === Experiment 5: CLIP + Artwork + Style (all combined) === - print("\n" + "=" * 50) - print(" Experiment 5: CLIP + Artwork + Style (all)") - print("=" * 50) - common_all = sorted(set(art_valid) & set(style_valid) & set(clip_valid)) - art_m = [art_valid.index(v) for v in common_all] - style_m = [style_valid.index(v) for v in common_all] - clip_m = [clip_valid.index(v) for v in common_all] - X_all = np.hstack([X_art[art_m], X_style[style_m], X_clip[clip_m]]) - y_all = y[common_all] - print(f" {X_all.shape[0]} images, {X_all.shape[1]} features") - - exp5 = {"name": "CLIP + Artwork + Style (Everything)", "short_name": "All Combined", - "n_features": X_all.shape[1], "extract_time": "combined", "results": {}} - for model in ["xgb", "svm", "mlp"]: - print(f" {model.upper()}...") - exp5["results"][model] = run_cv(X_all, y_all, model) - print(f" acc={exp5['results'][model]['accuracy']:.2%}") - experiments.append(exp5) - - # Save results (without y_true/y_prob arrays for JSON) - json_results = [] - for e in experiments: - je = {k: v for k, v in e.items() if k != "results"} - je["results"] = {} - for m, r in e["results"].items(): - je["results"][m] = {k: v for k, v in r.items() if k not in ("y_true", "y_prob")} - json_results.append(je) - - RESULTS_DIR.mkdir(exist_ok=True) - json_path = RESULTS_DIR / "experiments_results.json" - with open(json_path, "w") as f: - json.dump({"timestamp": datetime.now().isoformat(), "experiments": json_results}, f, indent=2) - print(f"\nJSON saved to: {json_path}") - - # Generate PDF - print("\nGenerating comparison PDF...") - generate_pdf(experiments) - - # Final summary - print(f"\n{'='*60}") - print(" FINAL SUMMARY") - print(f"{'='*60}") - for e in experiments: - best_model = max(e["results"], key=lambda m: e["results"][m]["accuracy"]) - best = e["results"][best_model] - print(f" {e['short_name']:20s} acc={best['accuracy']:.2%} auc={best['roc_auc']:.4f} ({best_model})") - - -if __name__ == "__main__": - main() diff --git a/tests/test_fair_evaluation.py b/tests/test_fair_evaluation.py deleted file mode 100644 index e3d1381..0000000 --- a/tests/test_fair_evaluation.py +++ /dev/null @@ -1,295 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -"""Fair evaluation: test artwork features on datasets where both classes are art. - -Addresses the confound that previous benchmarks used different subject matter -(cats vs bananas, wikiart vs generated), which inflates accuracy. - -Datasets: - 1. Hemg/AI-Generated-vs-Real-Images-Datasets — 153K, "AiArtData" vs "RealArt" - 2. Parveshiiii/AI-vs-Real — 14K balanced binary - -We sample N images from each class, extract 49 features, run 5-fold CV, -and report accuracy/precision/recall/F1/AUC with confidence intervals. -""" - -from __future__ import annotations - -import json -import sys -from datetime import datetime -from pathlib import Path - -import numpy as np -import pandas as pd -import xgboost as xgb -from datasets import load_dataset, Image as HFImage -from PIL import Image -from sklearn.metrics import ( - accuracy_score, f1_score, roc_auc_score, precision_score, recall_score, - confusion_matrix, roc_curve, -) -from sklearn.model_selection import StratifiedKFold -from sklearn.neural_network import MLPClassifier -from sklearn.preprocessing import StandardScaler -from sklearn.svm import SVC -from tqdm import tqdm - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from negate.extract.feature_artwork import ArtworkExtract - -SEED = 42 -N_FOLDS = 5 -RESULTS_DIR = Path(__file__).parent.parent / "results" - - -def extract_all_features(dataset, label_col: str, n_samples: int = 200): - """Extract features from a dataset, balanced per class.""" - extractor = ArtworkExtract() - features, labels, errors = [], [], 0 - - # Get unique labels and sample equally - all_labels = dataset[label_col] - unique_labels = sorted(set(all_labels)) - print(f" Labels found: {unique_labels}") - - per_class = n_samples // len(unique_labels) - - for lbl in unique_labels: - indices = [i for i, l in enumerate(all_labels) if l == lbl] - rng = np.random.RandomState(SEED) - chosen = rng.choice(indices, size=min(per_class, len(indices)), replace=False) - - for idx in tqdm(chosen, desc=f" Class {lbl}"): - try: - img = dataset[int(idx)]["image"] - if img is None: - errors += 1 - continue - if not isinstance(img, Image.Image): - errors += 1 - continue - feat = extractor(img) - features.append(feat) - # Binary: 0 = real/genuine, 1 = AI/synthetic - labels.append(0 if lbl == max(unique_labels) else 1) - except Exception as e: - errors += 1 - - print(f" Extracted {len(features)} images ({errors} errors)") - - df = pd.DataFrame(features).fillna(0) - X = df.to_numpy(dtype=np.float64) - X = np.where(np.isfinite(X), X, 0) - y = np.array(labels) - - return X, y, list(df.columns) - - -def cross_validate_xgb(X, y): - """5-fold CV with XGBoost.""" - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - all_true, all_prob = [], [] - fold_results = [] - - for fold, (train_idx, test_idx) in enumerate(skf.split(X, y)): - X_train, X_test = X[train_idx], X[test_idx] - y_train, y_test = y[train_idx], y[test_idx] - - spw = np.sum(y_train == 0) / max(np.sum(y_train == 1), 1) - params = { - "objective": "binary:logistic", - "eval_metric": ["logloss", "aucpr"], - "max_depth": 4, - "learning_rate": 0.1, - "subsample": 0.8, - "colsample_bytree": 0.8, - "scale_pos_weight": spw, - "seed": SEED, - } - dtrain = xgb.DMatrix(X_train, label=y_train) - dtest = xgb.DMatrix(X_test, label=y_test) - model = xgb.train(params, dtrain, num_boost_round=200, - evals=[(dtest, "test")], early_stopping_rounds=10, - verbose_eval=False) - - y_prob = model.predict(dtest) - y_pred = (y_prob > 0.5).astype(int) - - fold_results.append({ - "fold": fold + 1, - "accuracy": float(accuracy_score(y_test, y_pred)), - "precision": float(precision_score(y_test, y_pred, zero_division=0)), - "recall": float(recall_score(y_test, y_pred, zero_division=0)), - "f1": float(f1_score(y_test, y_pred, average="macro")), - "roc_auc": float(roc_auc_score(y_test, y_prob)), - }) - all_true.extend(y_test) - all_prob.extend(y_prob) - - return fold_results, np.array(all_true), np.array(all_prob) - - -def cross_validate_svm(X, y): - """5-fold CV with SVM.""" - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - all_true, all_prob = [], [] - - scaler = StandardScaler() - - for fold, (train_idx, test_idx) in enumerate(skf.split(X, y)): - X_train = scaler.fit_transform(X[train_idx]) - X_test = scaler.transform(X[test_idx]) - y_train, y_test = y[train_idx], y[test_idx] - - svm = SVC(kernel="rbf", probability=True, random_state=SEED) - svm.fit(X_train, y_train) - y_prob = svm.predict_proba(X_test)[:, 1] - all_true.extend(y_test) - all_prob.extend(y_prob) - - return np.array(all_true), np.array(all_prob) - - -def cross_validate_mlp(X, y): - """5-fold CV with MLP.""" - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - all_true, all_prob = [], [] - scaler = StandardScaler() - - for fold, (train_idx, test_idx) in enumerate(skf.split(X, y)): - X_train = scaler.fit_transform(X[train_idx]) - X_test = scaler.transform(X[test_idx]) - y_train, y_test = y[train_idx], y[test_idx] - - mlp = MLPClassifier(hidden_layer_sizes=(100,), max_iter=1000, random_state=SEED) - mlp.fit(X_train, y_train) - y_prob = mlp.predict_proba(X_test)[:, 1] - all_true.extend(y_test) - all_prob.extend(y_prob) - - return np.array(all_true), np.array(all_prob) - - -def summarize(name, fold_results, y_true, y_prob): - """Print summary for a classifier.""" - y_pred = (y_prob > 0.5).astype(int) - accs = [r["accuracy"] for r in fold_results] - print(f"\n{'='*50}") - print(f" {name}") - print(f"{'='*50}") - for r in fold_results: - print(f" Fold {r['fold']}: acc={r['accuracy']:.2%} prec={r['precision']:.2%} " - f"rec={r['recall']:.2%} auc={r['roc_auc']:.4f}") - print(f" ---") - print(f" Mean acc: {np.mean(accs):.2%} +/- {np.std(accs):.2%}") - print(f" Pooled: acc={accuracy_score(y_true, y_pred):.2%} " - f"prec={precision_score(y_true, y_pred, zero_division=0):.2%} " - f"rec={recall_score(y_true, y_pred, zero_division=0):.2%} " - f"auc={roc_auc_score(y_true, y_prob):.4f}") - cm = confusion_matrix(y_true, y_pred) - print(f" Confusion: TN={cm[0,0]} FP={cm[0,1]} FN={cm[1,0]} TP={cm[1,1]}") - - -def evaluate_dataset(name: str, repo: str, label_col: str, n_samples: int, split: str = "train"): - """Full evaluation pipeline for one dataset.""" - print(f"\n{'#'*60}") - print(f" DATASET: {name}") - print(f" Repo: {repo}") - print(f" Sampling: {n_samples} images ({n_samples//2} per class)") - print(f"{'#'*60}") - - print(f"\nLoading dataset...") - ds = load_dataset(repo, split=split) - ds = ds.cast_column("image", HFImage(decode=True, mode="RGB")) - print(f" Total rows: {len(ds)}") - - X, y, feature_names = extract_all_features(ds, label_col, n_samples) - print(f" Class balance: {np.sum(y==0)} real, {np.sum(y==1)} synthetic") - print(f" Features: {X.shape[1]}") - - # XGBoost - print(f"\nRunning {N_FOLDS}-fold CV (XGBoost)...") - xgb_folds, xgb_true, xgb_prob = cross_validate_xgb(X, y) - summarize(f"XGBoost on {name}", xgb_folds, xgb_true, xgb_prob) - - # SVM - print(f"\nRunning {N_FOLDS}-fold CV (SVM)...") - svm_true, svm_prob = cross_validate_svm(X, y) - svm_pred = (svm_prob > 0.5).astype(int) - print(f" SVM pooled: acc={accuracy_score(svm_true, svm_pred):.2%} " - f"auc={roc_auc_score(svm_true, svm_prob):.4f}") - - # MLP - print(f"\nRunning {N_FOLDS}-fold CV (MLP)...") - mlp_true, mlp_prob = cross_validate_mlp(X, y) - mlp_pred = (mlp_prob > 0.5).astype(int) - print(f" MLP pooled: acc={accuracy_score(mlp_true, mlp_pred):.2%} " - f"auc={roc_auc_score(mlp_true, mlp_prob):.4f}") - - return { - "dataset": name, - "repo": repo, - "n_samples": int(np.sum(y >= 0)), - "n_features": X.shape[1], - "xgb_folds": xgb_folds, - "xgb_accuracy": float(accuracy_score(xgb_true, (xgb_prob > 0.5).astype(int))), - "xgb_auc": float(roc_auc_score(xgb_true, xgb_prob)), - "xgb_precision": float(precision_score(xgb_true, (xgb_prob > 0.5).astype(int), zero_division=0)), - "xgb_recall": float(recall_score(xgb_true, (xgb_prob > 0.5).astype(int), zero_division=0)), - "svm_accuracy": float(accuracy_score(svm_true, svm_pred)), - "svm_auc": float(roc_auc_score(svm_true, svm_prob)), - "mlp_accuracy": float(accuracy_score(mlp_true, mlp_pred)), - "mlp_auc": float(roc_auc_score(mlp_true, mlp_prob)), - "feature_names": feature_names, - } - - -def main(): - print("=" * 60) - print(" FAIR EVALUATION: 49-Feature Artwork Detection") - print(" Testing on semantically-similar datasets") - print("=" * 60) - - results = [] - - # Dataset 1: Hemg — both classes are art - results.append(evaluate_dataset( - name="AI-Art vs Real-Art (Hemg)", - repo="Hemg/AI-Generated-vs-Real-Images-Datasets", - label_col="label", - n_samples=400, - )) - - # Dataset 2: Parveshiiii — balanced binary - results.append(evaluate_dataset( - name="AI vs Real (Parveshiiii)", - repo="Parveshiiii/AI-vs-Real", - label_col="binary_label", - n_samples=400, - )) - - # Save results - RESULTS_DIR.mkdir(exist_ok=True) - out_path = RESULTS_DIR / "fair_evaluation_results.json" - with open(out_path, "w") as f: - json.dump({ - "timestamp": datetime.now().isoformat(), - "evaluation": "fair_semantically_similar", - "datasets": results, - }, f, indent=2, default=str) - - print(f"\n{'='*60}") - print(f" RESULTS SUMMARY") - print(f"{'='*60}") - for r in results: - print(f"\n {r['dataset']}:") - print(f" XGBoost: acc={r['xgb_accuracy']:.2%} auc={r['xgb_auc']:.4f} " - f"prec={r['xgb_precision']:.2%} rec={r['xgb_recall']:.2%}") - print(f" SVM: acc={r['svm_accuracy']:.2%} auc={r['svm_auc']:.4f}") - print(f" MLP: acc={r['mlp_accuracy']:.2%} auc={r['mlp_auc']:.4f}") - - print(f"\nResults saved to: {out_path}") - - -if __name__ == "__main__": - main() diff --git a/tests/test_scale_evaluation.py b/tests/test_scale_evaluation.py deleted file mode 100644 index df9bbfe..0000000 --- a/tests/test_scale_evaluation.py +++ /dev/null @@ -1,363 +0,0 @@ -# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 -"""Scale evaluation: test if more training data improves artwork detection. - -Runs the 49-feature pipeline on increasing sample sizes from Hemg (art vs art) -to determine if 71% accuracy is a data problem or a feature problem. -""" - -from __future__ import annotations - -import json -import sys -from datetime import datetime -from pathlib import Path - -import matplotlib -matplotlib.use("Agg") -import matplotlib.pyplot as plt -from matplotlib.backends.backend_pdf import PdfPages -import numpy as np -import pandas as pd -import xgboost as xgb -from datasets import load_dataset, Image as HFImage -from PIL import Image -from sklearn.metrics import ( - accuracy_score, f1_score, roc_auc_score, precision_score, recall_score, - confusion_matrix, -) -from sklearn.model_selection import StratifiedKFold -from sklearn.neural_network import MLPClassifier -from sklearn.preprocessing import StandardScaler -from sklearn.svm import SVC -from tqdm import tqdm - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from negate.extract.feature_artwork import ArtworkExtract - -SEED = 42 -N_FOLDS = 5 -REPO = "Hemg/AI-Generated-vs-Real-Images-Datasets" -SAMPLE_SIZES = [400, 1000, 2000, 4000] # total (half per class) -RESULTS_DIR = Path(__file__).parent.parent / "results" - - -def extract_features_cached(dataset, n_per_class: int, extractor: ArtworkExtract): - """Extract features, balanced per class.""" - all_labels = dataset["label"] - features, labels, errors = [], [], 0 - - rng = np.random.RandomState(SEED) - - for lbl in [0, 1]: - indices = [i for i, l in enumerate(all_labels) if l == lbl] - chosen = rng.choice(indices, size=min(n_per_class, len(indices)), replace=False) - - for idx in tqdm(chosen, desc=f" Label {lbl} (n={n_per_class})"): - try: - img = dataset[int(idx)]["image"] - if img is None or not isinstance(img, Image.Image): - errors += 1 - continue - feat = extractor(img) - features.append(feat) - # label 0 = AI art (synthetic), label 1 = Real art (genuine) - # We want: 0 = genuine, 1 = synthetic - labels.append(1 if lbl == 0 else 0) - except Exception: - errors += 1 - - print(f" Extracted {len(features)} ({errors} errors)") - df = pd.DataFrame(features).fillna(0) - X = df.to_numpy(dtype=np.float64) - X = np.where(np.isfinite(X), X, 0) - y = np.array(labels) - return X, y, list(df.columns) - - -def run_cv(X, y, model_type="xgb"): - """Run 5-fold CV, return pooled y_true, y_prob.""" - skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) - all_true, all_prob = [], [] - - for train_idx, test_idx in skf.split(X, y): - X_train, X_test = X[train_idx], X[test_idx] - y_train, y_test = y[train_idx], y[test_idx] - - if model_type == "xgb": - spw = np.sum(y_train == 0) / max(np.sum(y_train == 1), 1) - params = { - "objective": "binary:logistic", "eval_metric": "logloss", - "max_depth": 4, "learning_rate": 0.1, "subsample": 0.8, - "colsample_bytree": 0.8, "scale_pos_weight": spw, "seed": SEED, - } - dtrain = xgb.DMatrix(X_train, label=y_train) - dtest = xgb.DMatrix(X_test, label=y_test) - model = xgb.train(params, dtrain, num_boost_round=200, - evals=[(dtest, "test")], early_stopping_rounds=10, - verbose_eval=False) - y_prob = model.predict(dtest) - elif model_type == "svm": - scaler = StandardScaler() - X_tr = scaler.fit_transform(X_train) - X_te = scaler.transform(X_test) - svm = SVC(kernel="rbf", probability=True, random_state=SEED) - svm.fit(X_tr, y_train) - y_prob = svm.predict_proba(X_te)[:, 1] - elif model_type == "mlp": - scaler = StandardScaler() - X_tr = scaler.fit_transform(X_train) - X_te = scaler.transform(X_test) - mlp = MLPClassifier(hidden_layer_sizes=(128, 64), max_iter=1000, - random_state=SEED, early_stopping=True) - mlp.fit(X_tr, y_train) - y_prob = mlp.predict_proba(X_te)[:, 1] - - all_true.extend(y_test) - all_prob.extend(y_prob) - - y_true = np.array(all_true) - y_prob = np.array(all_prob) - y_pred = (y_prob > 0.5).astype(int) - - return { - "accuracy": float(accuracy_score(y_true, y_pred)), - "precision": float(precision_score(y_true, y_pred, zero_division=0)), - "recall": float(recall_score(y_true, y_pred, zero_division=0)), - "f1": float(f1_score(y_true, y_pred, average="macro")), - "roc_auc": float(roc_auc_score(y_true, y_prob)), - } - - -def generate_pdf(all_results): - """Generate scaling analysis PDF.""" - RESULTS_DIR.mkdir(exist_ok=True) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - pdf_path = RESULTS_DIR / f"scale_evaluation_{timestamp}.pdf" - - with PdfPages(str(pdf_path)) as pdf: - # PAGE 1: Title + scaling curves - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - - fig.suptitle("Scaling Analysis: Does More Data Improve\nArtwork Detection Accuracy?", - fontsize=16, fontweight="bold", fontfamily="serif", y=0.96) - - # Subtitle - fig.text(0.5, 0.90, f"negate project — darkshapes — {datetime.now().strftime('%B %d, %Y')}", - fontsize=10, ha="center", fontfamily="serif", style="italic") - - fig.text(0.5, 0.87, "Dataset: Hemg/AI-Generated-vs-Real-Images-Datasets (AI Art vs Real Art)", - fontsize=9, ha="center", fontfamily="serif") - - # Accuracy scaling curve - ax1 = fig.add_axes([0.12, 0.52, 0.76, 0.3]) - sizes = [r["total"] for r in all_results] - - for model, color, marker in [("xgb", "#4472C4", "o"), ("svm", "#ED7D31", "s"), ("mlp", "#70AD47", "^")]: - accs = [r[model]["accuracy"] for r in all_results] - ax1.plot(sizes, accs, f"-{marker}", color=color, label=model.upper(), markersize=8, linewidth=2) - for x, y in zip(sizes, accs): - ax1.annotate(f"{y:.1%}", (x, y), textcoords="offset points", - xytext=(0, 10), ha="center", fontsize=8) - - ax1.set_xlabel("Total Training Samples", fontsize=10) - ax1.set_ylabel("5-Fold CV Accuracy", fontsize=10) - ax1.set_title("Accuracy vs Training Set Size", fontsize=12, fontfamily="serif") - ax1.legend(fontsize=9) - ax1.grid(True, alpha=0.3) - ax1.set_ylim(0.5, 1.0) - ax1.axhline(y=0.5, color="red", linestyle="--", alpha=0.3, label="Random chance") - - # AUC scaling curve - ax2 = fig.add_axes([0.12, 0.12, 0.76, 0.3]) - - for model, color, marker in [("xgb", "#4472C4", "o"), ("svm", "#ED7D31", "s"), ("mlp", "#70AD47", "^")]: - aucs = [r[model]["roc_auc"] for r in all_results] - ax2.plot(sizes, aucs, f"-{marker}", color=color, label=model.upper(), markersize=8, linewidth=2) - for x, y in zip(sizes, aucs): - ax2.annotate(f"{y:.3f}", (x, y), textcoords="offset points", - xytext=(0, 10), ha="center", fontsize=8) - - ax2.set_xlabel("Total Training Samples", fontsize=10) - ax2.set_ylabel("5-Fold CV ROC-AUC", fontsize=10) - ax2.set_title("ROC-AUC vs Training Set Size", fontsize=12, fontfamily="serif") - ax2.legend(fontsize=9) - ax2.grid(True, alpha=0.3) - ax2.set_ylim(0.5, 1.0) - - pdf.savefig(fig) - plt.close(fig) - - # PAGE 2: Results table + analysis - fig = plt.figure(figsize=(8.5, 11)) - fig.patch.set_facecolor("white") - fig.suptitle("Detailed Results & Analysis", fontsize=14, - fontweight="bold", fontfamily="serif", y=0.96) - - # Results table - ax_table = fig.add_axes([0.05, 0.62, 0.9, 0.28]) - ax_table.axis("off") - - table_data = [] - for r in all_results: - for model in ["xgb", "svm", "mlp"]: - m = r[model] - table_data.append([ - str(r["total"]), model.upper(), - f"{m['accuracy']:.2%}", f"{m['precision']:.2%}", - f"{m['recall']:.2%}", f"{m['f1']:.2%}", f"{m['roc_auc']:.4f}" - ]) - - table = ax_table.table( - cellText=table_data, - colLabels=["Samples", "Model", "Accuracy", "Precision", "Recall", "F1", "AUC"], - loc="center", cellLoc="center", - ) - table.auto_set_font_size(False) - table.set_fontsize(7.5) - table.scale(1, 1.3) - for (row, col), cell in table.get_celld().items(): - if row == 0: - cell.set_facecolor("#4472C4") - cell.set_text_props(color="white", fontweight="bold") - - # Analysis - ax_text = fig.add_axes([0.08, 0.05, 0.84, 0.52]) - ax_text.axis("off") - - best_final = max(all_results[-1]["xgb"]["accuracy"], - all_results[-1]["svm"]["accuracy"], - all_results[-1]["mlp"]["accuracy"]) - best_initial = max(all_results[0]["xgb"]["accuracy"], - all_results[0]["svm"]["accuracy"], - all_results[0]["mlp"]["accuracy"]) - improvement = best_final - best_initial - - analysis = ( - "Analysis\n\n" - f"Sample sizes tested: {', '.join(str(r['total']) for r in all_results)}\n" - f"Best accuracy at smallest size ({all_results[0]['total']}): {best_initial:.1%}\n" - f"Best accuracy at largest size ({all_results[-1]['total']}): {best_final:.1%}\n" - f"Improvement from scaling: {improvement:+.1%}pp\n\n" - ) - - if improvement > 0.10: - analysis += ( - "FINDING: Significant improvement with more data.\n" - "The 49 features have capacity to learn — the initial low accuracy was\n" - "primarily a data limitation. With sufficient training data, the hand-crafted\n" - "features can achieve useful detection rates on artwork.\n\n" - "Recommendation: Scale to even larger samples (10K+) and consider\n" - "integrating these features into the negate pipeline." - ) - elif improvement > 0.03: - analysis += ( - "FINDING: Modest improvement with more data.\n" - "More data helps somewhat, but accuracy is plateauing. The features\n" - "capture some genuine signal but are limited by their expressiveness.\n\n" - "Recommendation: The hand-crafted features are hitting a ceiling.\n" - "To push past this, the pipeline needs learned features — either\n" - "fine-tuned CLIP/DINOv2 or the self-supervised approach from\n" - "Zhong et al. (2026)." - ) - else: - analysis += ( - "FINDING: Minimal improvement with more data.\n" - "The features are saturated — adding more training data does not help.\n" - "The 49 hand-crafted features simply don't capture enough discriminative\n" - "information to distinguish AI art from human art.\n\n" - "Recommendation: Fundamentally different features are needed.\n" - "Hand-crafted statistics cannot match the representational power of\n" - "learned features for this task." - ) - - analysis += ( - "\n\nContext\n\n" - "This evaluation uses only the Hemg dataset where BOTH classes are artwork.\n" - "This is the hardest and most honest test — no content shortcuts.\n" - "All processing is CPU-only, 49 features per image.\n" - "5-fold stratified cross-validation with fixed random seed (42).\n" - ) - - ax_text.text(0, 1, analysis, transform=ax_text.transAxes, fontsize=9, - ha="left", va="top", fontfamily="serif") - - pdf.savefig(fig) - plt.close(fig) - - print(f"PDF saved to: {pdf_path}") - return pdf_path - - -def main(): - print("=" * 60) - print(" SCALING ANALYSIS: Art Detection vs Training Data Size") - print(" Dataset: Hemg (AI Art vs Real Art)") - print("=" * 60) - - print("\nLoading dataset...") - ds = load_dataset(REPO, split="train") - ds = ds.cast_column("image", HFImage(decode=True, mode="RGB")) - print(f" Total rows: {len(ds)}") - - extractor = ArtworkExtract() - all_results = [] - - # We extract at the largest size once, then subsample - max_per_class = max(SAMPLE_SIZES) // 2 - print(f"\nExtracting features for {max_per_class} per class...") - X_full, y_full, feature_names = extract_features_cached(ds, max_per_class, extractor) - print(f" Total: {len(y_full)} images, {X_full.shape[1]} features") - print(f" Balance: {np.sum(y_full==0)} genuine, {np.sum(y_full==1)} synthetic") - - for total in SAMPLE_SIZES: - per_class = total // 2 - print(f"\n{'='*40}") - print(f" Testing with {total} samples ({per_class} per class)") - print(f"{'='*40}") - - # Subsample from the full extraction - rng = np.random.RandomState(SEED) - idx_0 = np.where(y_full == 0)[0] - idx_1 = np.where(y_full == 1)[0] - chosen_0 = rng.choice(idx_0, size=min(per_class, len(idx_0)), replace=False) - chosen_1 = rng.choice(idx_1, size=min(per_class, len(idx_1)), replace=False) - chosen = np.concatenate([chosen_0, chosen_1]) - X = X_full[chosen] - y = y_full[chosen] - - result = {"total": len(y)} - for model in ["xgb", "svm", "mlp"]: - print(f" Running {model.upper()}...") - result[model] = run_cv(X, y, model) - print(f" acc={result[model]['accuracy']:.2%} auc={result[model]['roc_auc']:.4f}") - - all_results.append(result) - - # Save JSON - RESULTS_DIR.mkdir(exist_ok=True) - json_path = RESULTS_DIR / "scale_evaluation_results.json" - with open(json_path, "w") as f: - json.dump({ - "timestamp": datetime.now().isoformat(), - "dataset": REPO, - "feature_count": X_full.shape[1], - "results": all_results, - }, f, indent=2) - print(f"\nJSON saved to: {json_path}") - - # Generate PDF - print("\nGenerating PDF...") - generate_pdf(all_results) - - # Print summary - print(f"\n{'='*60}") - print(" SUMMARY") - print(f"{'='*60}") - for r in all_results: - best = max(r["xgb"]["accuracy"], r["svm"]["accuracy"], r["mlp"]["accuracy"]) - print(f" n={r['total']:5d} best_acc={best:.2%}") - - -if __name__ == "__main__": - main()