From 7c689fb640386392f33822fda3c2c7eeaceef647 Mon Sep 17 00:00:00 2001 From: JoyceJYW Date: Tue, 9 Dec 2025 00:56:55 -0500 Subject: [PATCH 1/4] Fix log directory path and add logs to .gitignore --- .gitignore | 1 + cmp_viewer/Cluster.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7a4b67b..723849d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ __pycache__/ .vscode .idea +*.log \ No newline at end of file diff --git a/cmp_viewer/Cluster.py b/cmp_viewer/Cluster.py index a566c71..70088ea 100644 --- a/cmp_viewer/Cluster.py +++ b/cmp_viewer/Cluster.py @@ -17,7 +17,7 @@ import logging # Create Logs directory if it doesn't exist -log_dir = 'cmp_viewer/Logs' +log_dir = os.path.join(os.path.dirname(__file__), 'Logs') os.makedirs(log_dir, exist_ok=True) # Configure logging to write to BOTH file and console From bd8c8f1108ea96d06fdc40a9c4b9480062169acc Mon Sep 17 00:00:00 2001 From: JoyceJYW Date: Tue, 9 Dec 2025 04:19:24 -0500 Subject: [PATCH 2/4] Refactor: major code reorganization and module extraction --- cmp_viewer/ImageViewer.py | 36 +- cmp_viewer/ImageViewerBU.py | 19 +- cmp_viewer/{Cluster.py => cluster_widget.py} | 604 +----------------- cmp_viewer/clustering_algorithms.py | 318 +++++++++ .../{clusterImgSelect.py => dialogs.py} | 15 +- cmp_viewer/mask.py | 122 ++++ cmp_viewer/utils.py | 199 +++++- 7 files changed, 680 insertions(+), 633 deletions(-) rename cmp_viewer/{Cluster.py => cluster_widget.py} (52%) create mode 100644 cmp_viewer/clustering_algorithms.py rename cmp_viewer/{clusterImgSelect.py => dialogs.py} (97%) create mode 100644 cmp_viewer/mask.py diff --git a/cmp_viewer/ImageViewer.py b/cmp_viewer/ImageViewer.py index 9a121b0..3281eeb 100644 --- a/cmp_viewer/ImageViewer.py +++ b/cmp_viewer/ImageViewer.py @@ -2,46 +2,42 @@ # python -m cmp_viewer.imageviewer """ImageViewer is an initial core for opening and viewing CMP image stacks""" import sys -import cv2 import os -import glob -from cmp_viewer.rgb import * -from cmp_viewer.clusterImgSelect import * -from cmp_viewer.Cluster import * import nornir_imageregistration -import datetime from cmp_viewer import models from PIL import Image import typing +from numpy.typing import NDArray import numpy as np import re import csv +import cv2 +from cmp_viewer.dialogs import ImageSelectDlg +from cmp_viewer.rgb import create_composite_image +from cmp_viewer.cluster_widget import Cluster +from cmp_viewer import utils +from cmp_viewer import mask as mask_module +from cmp_viewer.utils import KMeansSettings, ISODATASettings from PyQt5.QtWidgets import QApplication, QGraphicsScene, QGraphicsView from PyQt5.QtWidgets import QMainWindow from PyQt5.QtWidgets import QScrollArea from PyQt5.QtWidgets import QWidget -from PyQt5.QtGui import QImage, QColor -from PyQt5.QtWidgets import QMenuBar +from PyQt5.QtGui import QImage from PyQt5.QtWidgets import QFileDialog from PyQt5.QtWidgets import QAction from PyQt5.QtCore import Qt -from PyQt5.QtWidgets import QGridLayout from PyQt5.QtWidgets import QLabel -from PyQt5.QtWidgets import QLineEdit from PyQt5.QtWidgets import QRadioButton from PyQt5.QtWidgets import QPushButton from PyQt5.QtWidgets import QVBoxLayout from PyQt5.QtWidgets import QHBoxLayout -from PyQt5.QtWidgets import QListView from PyQt5.QtWidgets import QListWidget from PyQt5.QtWidgets import QComboBox from PyQt5.QtWidgets import QSlider, QProgressDialog, QListWidgetItem, QColorDialog, QMenu, QInputDialog from PyQt5 import QtWidgets -from PyQt5 import QtGui -from PyQt5.QtGui import QPixmap, qRgb +from PyQt5.QtGui import QPixmap from PyQt5.QtWidgets import QMessageBox -from functools import partial __version__ = '1.5.2' __author__ = "RL Pfeiffer & NQN Studios" @@ -1215,7 +1211,7 @@ def merge_selected_clusters(self): Merge selected clusters into a single cluster. This method gathers the checked clusters in the Cluster Mask Visibility list, - calls the merge_clusters method in Cluster.py with those IDs, and invokes + calls the merge_clusters method in cluster_widget.py with those IDs, and invokes the on_cluster_callback with the updated labels/settings. """ if self.clusterview is None or self._masks is None: @@ -1233,7 +1229,7 @@ def merge_selected_clusters(self): QMessageBox.warning(self, "Insufficient Clusters Selected", "Please check at least two clusters to merge in the Cluster Mask Visibility list.") return - # Call the merge_clusters method in Cluster.py + # Call the merge_clusters method in cluster_widget.py new_labels, new_settings = self.clusterview.merge_clusters(checked_ids) if new_labels is None: @@ -1303,7 +1299,7 @@ def export_cluster_masks(self): output_path = os.path.join(output_dir, f"cluster_{cluster_id}_mask") # Use Cluster class to export the mask - success = self.clusterview.export_cluster_mask(cluster_id, output_path, file_format) + success = mask_module.export_cluster_mask(cluster_id, output_path, file_format) if not success: print(f"Failed to export mask for cluster {cluster_id}") @@ -1330,7 +1326,7 @@ def show_label_image(self, img, num_labels: int): self._num_labels = num_labels # Use Cluster class to prepare the image and get the color table - prepared_img, self._color_table = self.clusterview.prepare_label_image_for_display(img, num_labels) + prepared_img, self._color_table = utils.prepare_label_image_for_display(img, num_labels) self._clustered_image = prepared_img # Create QImage from the prepared image @@ -1382,7 +1378,7 @@ def show_label_image(self, img, num_labels: int): # Calculate optimal scale factor if self.clusterview is not None: - scale_factor = self.clusterview.calculate_optimal_scale_factor(height, width) + scale_factor = utils.calculate_optimal_scale_factor(height, width) else: max_pixels = 500000 scale_factor = np.sqrt(max_pixels / (height * width)) if height * width > max_pixels else 1.0 @@ -1408,7 +1404,7 @@ def show_label_image(self, img, num_labels: int): # Create mask overlay if self.clusterview is not None: - overlay = self.clusterview.create_mask_overlay( + overlay = mask_module.create_mask_overlay( mask, color, self._mask_opacity, target_width=new_width, target_height=new_height ) diff --git a/cmp_viewer/ImageViewerBU.py b/cmp_viewer/ImageViewerBU.py index 9d219b1..4d2c33b 100644 --- a/cmp_viewer/ImageViewerBU.py +++ b/cmp_viewer/ImageViewerBU.py @@ -2,12 +2,11 @@ # python -m cmp_viewer.imageviewer """ImageViewer is an initial core for opening and viewing CMP image stacks""" import sys -import cv2 -import os import glob -from cmp_viewer.rgb import * -from cmp_viewer.clusterImgSelect import * -from cmp_viewer.Cluster import * + +from cmp_viewer.dialogs import ImageSelectDlg +from cmp_viewer.display import create_composite_image +from cmp_viewer.cluster_widget import * import nornir_imageregistration from cmp_viewer import models from PIL import Image @@ -18,31 +17,29 @@ from PyQt5.QtWidgets import QMainWindow from PyQt5.QtWidgets import QScrollArea from PyQt5.QtWidgets import QWidget -from PyQt5.QtGui import QImage, QColor -from PyQt5.QtWidgets import QMenuBar +from PyQt5.QtGui import QImage from PyQt5.QtWidgets import QFileDialog from PyQt5.QtWidgets import QAction from PyQt5.QtCore import Qt -from PyQt5.QtWidgets import QGridLayout from PyQt5.QtWidgets import QLabel from PyQt5.QtWidgets import QLineEdit from PyQt5.QtWidgets import QRadioButton from PyQt5.QtWidgets import QPushButton from PyQt5.QtWidgets import QVBoxLayout from PyQt5.QtWidgets import QHBoxLayout -from PyQt5.QtWidgets import QListView from PyQt5.QtWidgets import QListWidget from PyQt5.QtWidgets import QComboBox from PyQt5.QtWidgets import QSlider, QProgressDialog, QListWidgetItem from PyQt5 import QtWidgets -from PyQt5 import QtGui from PyQt5.QtGui import QPixmap, qRgb from PyQt5.QtWidgets import QMessageBox -from functools import partial __version__ = '1.0' __author__ = "RL Pfeiffer & NQN Studios" +from cmp_viewer.utils import KMeansSettings + + class ImageViewerUi(QMainWindow): rawImages = [] fileNameList = [] diff --git a/cmp_viewer/Cluster.py b/cmp_viewer/cluster_widget.py similarity index 52% rename from cmp_viewer/Cluster.py rename to cmp_viewer/cluster_widget.py index 70088ea..4185a15 100644 --- a/cmp_viewer/Cluster.py +++ b/cmp_viewer/cluster_widget.py @@ -1,20 +1,21 @@ import typing from typing import Callable, Tuple, List, Any, Dict -import collections import os -import threading import cv2 - import numpy as np from numpy.typing import NDArray -from PyQt5.QtGui import QPixmap, QImage, qRgb, QColor +from PyQt5.QtGui import QImage, qRgb, QColor from sklearn.cluster import KMeans from PIL import Image -from PyQt5.QtCore import Qt, QObject, QThread, pyqtSignal -from PyQt5.QtWidgets import QWidget, QVBoxLayout, QListWidget, QPushButton, QInputDialog, QGraphicsPixmapItem, QProgressDialog, QMessageBox +from PyQt5.QtCore import Qt, QThread +from PyQt5.QtWidgets import QWidget, QVBoxLayout, QListWidget, QPushButton, QInputDialog, QProgressDialog, QMessageBox import cmp_viewer.models import cmp_viewer.utils import logging +from cmp_viewer.clustering_algorithms import ClusteringWorker, isodata_algorithm +from cmp_viewer.utils import KMeansSettings, ISODATASettings +import cmp_viewer.mask as mask_module +from cmp_viewer import utils # Create Logs directory if it doesn't exist log_dir = os.path.join(os.path.dirname(__file__), 'Logs') @@ -30,7 +31,7 @@ ] ) logger = logging.getLogger(__name__) -logger.info("=== Cluster.py module loaded ===") +logger.info("=== cluster_widget.py module loaded ===") """ @@ -38,111 +39,6 @@ It implements K-means clustering, ISODATA clustering, and visualization of clustered images. """ -class KMeansSettings(typing.NamedTuple): - """ - A named tuple for storing K-means clustering parameters. - - Attributes: - n_clusters (int): Number of clusters to form. - init (str): Method for initialization ('random', 'k-means++', etc.). - n_init (int): Number of times the k-means algorithm will be run with different seeds. - max_iter (int): Maximum number of iterations for a single run. - tol (float): Relative tolerance for convergence. - random_state (int): Seed for random number generation for reproducibility. - """ - n_clusters: int - init: str - n_init: int - max_iter: int - tol: float - random_state: int - -class ISODATASettings(typing.NamedTuple): - """ - A named tuple for storing ISODATA clustering parameters. - - ISODATA (Iterative Self-Organizing Data Analysis Technique) is an extension - of k-means that allows for merging and splitting of clusters based on various criteria. - - Attributes: - n_clusters (int): Initial number of clusters to form. - max_iter (int): Maximum number of iterations. - min_samples (int): Minimum number of samples in a cluster. - max_std_dev (float): Maximum standard deviation within a cluster. - min_cluster_distance (float): Minimum distance between clusters for merging. - max_merge_pairs (int): Maximum number of cluster pairs to merge per iteration. - random_state (int): Seed for random number generation for reproducibility. - """ - n_clusters: int - max_iter: int - min_samples: int - max_std_dev: float - min_cluster_distance: float - max_merge_pairs: int - random_state: int - - -class ClusteringWorker(QObject): - progress = pyqtSignal(int, str) - finished = pyqtSignal(object, object) # labels (ndarray), settings - error = pyqtSignal(str) - canceled = pyqtSignal() - - def __init__(self, *, algorithm: str, data: np.ndarray, settings: typing.Any, image_shape: typing.Tuple[int, int], isodata_fn: typing.Callable = None): - super().__init__() - self.algorithm = algorithm - self.data = data - self.settings = settings - self.image_shape = image_shape - self.isodata_fn = isodata_fn - self._cancel_event = threading.Event() - - def request_cancel(self): - self._cancel_event.set() - - def run(self): - try: - if self.algorithm == 'kmeans': - self.progress.emit(0, 'Initializing k-means...') - # sklearn expects samples as rows; upstream provides pixels as (n_selected_images, n_pixels) - pixels = self.data - km = KMeans(n_clusters=self.settings.n_clusters, - init=self.settings.init, - n_init=self.settings.n_init, - max_iter=self.settings.max_iter, - tol=self.settings.tol, - random_state=self.settings.random_state) - # Indeterminate: cannot report inner progress - labels = None - km.fit(pixels.T) - if self._cancel_event.is_set(): - self.canceled.emit() - return - labels = km.labels_.reshape(self.image_shape) - self.progress.emit(100, 'k-means complete') - self.finished.emit(labels, self.settings) - elif self.algorithm == 'isodata': - if self.isodata_fn is None: - raise RuntimeError('ISODATA function not provided') - def cb(pct: int, msg: str): - self.progress.emit(pct, msg) - labels_flat = self.isodata_fn(self.data, self.settings, progress_cb=cb, cancel_event=self._cancel_event) - if labels_flat is None: - # Treat None as canceled - self.canceled.emit() - return - labels = labels_flat.reshape(self.image_shape) - self.progress.emit(100, 'ISODATA complete') - self.finished.emit(labels, self.settings) - else: - raise RuntimeError(f'Unknown algorithm: {self.algorithm}') - except Exception as e: - # If cancellation expressed via exception, map to canceled - msg = str(e) - if 'CANCELED' in msg.upper() or isinstance(e, KeyboardInterrupt): - self.canceled.emit() - else: - self.error.emit(msg) class Cluster(QWidget): """ @@ -444,7 +340,7 @@ def _start_clustering_worker(self, *, algorithm: str, data: np.ndarray, settings # Spin up thread and worker self._worker_thread = QThread(self) - self._worker = ClusteringWorker(algorithm=algorithm, data=data, settings=settings, image_shape=image_shape, isodata_fn=self._isodata_algorithm) + self._worker = ClusteringWorker(algorithm=algorithm, data=data, settings=settings, image_shape=image_shape, isodata_fn=isodata_algorithm) self._worker.moveToThread(self._worker_thread) # Wire signals @@ -477,7 +373,7 @@ def on_finished(labels, returned_settings): # Update state and invoke callback self.labels = labels n_clusters = len(np.unique(self.labels)) if isinstance(returned_settings, ISODATASettings) else returned_settings.n_clusters - self.masks = self.generate_masks(self.labels, n_clusters) + self.masks = mask_module.generate_masks(self.labels, n_clusters) # Save to undo stack self.undo_stack.append((np.copy(self.labels), {k: (mask.copy(), color) for k, (mask, color) in self.masks.items()})) if len(self.undo_stack) > self.undo_stack_max_size: @@ -525,342 +421,10 @@ def on_error(message: str): self._worker_thread.start() self._progress_dialog.show() - def _isodata_algorithm(self, data: NDArray, settings: ISODATASettings, progress_cb: typing.Callable[[int, str], None] = None, cancel_event: threading.Event = None) -> NDArray[int]: - - """ - Implement the ISODATA clustering algorithm with optional progress and cancellation support. - - ISODATA (Iterative Self-Organizing Data Analysis Technique) is an extension - of k-means that allows for merging and splitting of clusters based on various criteria. - - Args: - data (NDArray): Data to cluster, shape (n_features, n_samples) - settings (ISODATASettings): Settings for the ISODATA algorithm - - Returns: - NDArray[int]: Cluster labels for each sample - """ - logger.info(f"=== _isodata_algorithm started ===") - logger.info(f"Data shape: {data.shape} (features x samples)") - logger.info(f"Initial clusters: {settings.n_clusters}") - - np.random.seed(settings.random_state) - n_samples = data.shape[1] - n_features = data.shape[0] - - # Adjust number of clusters if it exceeds number of samples - settings = settings._replace(n_clusters=min(settings.n_clusters, n_samples)) - logger.info(f"Adjusted clusters to {settings.n_clusters} (cannot exceed {n_samples} samples)") - - # Initialize centroids randomly - # Select k random samples as initial centroids - indices = np.random.choice(n_samples, settings.n_clusters, replace=False) - centroids = data[:, indices] - logger.info(f"Initialized {settings.n_clusters} centroids") - - # Initialize labels - labels = np.zeros(n_samples, dtype=int) - - for iteration in range(settings.max_iter): - logger.info(f"--- Iteration {iteration + 1}/{settings.max_iter} ---") - - # Progress and cancel checks - if cancel_event is not None and cancel_event.is_set(): - logger.info("Algorithm canceled by user") - return None - if progress_cb is not None: - pct = int((iteration / max(1, settings.max_iter)) * 100) - progress_cb(pct, f"ISODATA: Iteration {iteration+1}/{settings.max_iter}") - # Store current number of clusters before any modifications - old_n_clusters = settings.n_clusters - - # Assign samples to closest centroids (like k-means) - logger.debug(f"Assigning samples to {settings.n_clusters} centroids...") - distances = np.zeros((settings.n_clusters, n_samples)) - for i in range(settings.n_clusters): - diff = data - centroids[:, i].reshape(-1, 1) - distances[i] = np.sum(diff**2, axis=0) - - # Assign each sample to the closest centroid - labels = np.argmin(distances, axis=0) - unique_labels_assigned = len(np.unique(labels)) - logger.info(f"Assigned samples to {unique_labels_assigned} unique clusters") - - # Make a copy of the current centroids for convergence check - old_centroids = centroids.copy() - - # Update centroids based on new assignments - logger.debug("Updating centroids...") - for i in range(settings.n_clusters): - cluster_samples = data[:, labels == i] - if cluster_samples.shape[1] > 0: - centroids[:, i] = np.mean(cluster_samples, axis=1) - - # Check for empty clusters and handle them - empty_clusters = [i for i in range(settings.n_clusters) if np.sum(labels == i) == 0] - if empty_clusters: - logger.info(f"Handling {len(empty_clusters)} empty clusters") - - for i in range(settings.n_clusters): - if np.sum(labels == i) == 0: - # Find the cluster with the most samples - largest_cluster = np.argmax([np.sum(labels == j) for j in range(settings.n_clusters)]) - # Find the samples furthest from the centroid in the largest cluster - cluster_samples = data[:, labels == largest_cluster] - if cluster_samples.shape[1] > 0: - diff = cluster_samples - centroids[:, largest_cluster].reshape(-1, 1) - distances = np.sum(diff ** 2, axis=0) - furthest_sample_idx = np.argmax(distances) - # Set the empty cluster's centroid to this sample - centroids[:, i] = cluster_samples[:, furthest_sample_idx] - # Reassign some samples to this new centroid - diff = data - centroids[:, i].reshape(-1, 1) - new_distances = np.sum(diff ** 2, axis=0) - closest_to_new = np.argsort(new_distances)[:settings.min_samples] - labels[closest_to_new] = i - - # ISODATA specific steps: - logger.debug("Applying ISODATA operations...") - - # 1. Discard clusters with too few samples - for i in range(settings.n_clusters): - if np.sum(labels == i) < settings.min_samples: - # Reassign samples from small clusters to the closest remaining cluster - small_cluster_samples = np.where(labels == i)[0] - for sample_idx in small_cluster_samples: - # Find the next closest centroid - sample = data[:, sample_idx] - distances = np.array([np.sum((sample - centroids[:, j])**2) for j in range(settings.n_clusters) if j != i]) - closest_centroid = np.argmin(distances) - # Adjust for the removed index - if closest_centroid >= i: - closest_centroid += 1 - labels[sample_idx] = closest_centroid - - # Remove the centroid - centroids = np.delete(centroids, i, axis=1) - - # Update labels to reflect the removed centroid - labels[labels > i] -= 1 - - # Adjust the number of clusters - settings = settings._replace(n_clusters=settings.n_clusters - 1) - - # Break to recalculate everything with the new number of clusters - break - - # 2. Split clusters with large standard deviation - for i in range(settings.n_clusters): - cluster_samples = data[:, labels == i] - if cluster_samples.shape[1] > 2 * settings.min_samples: - # Calculate standard deviation of the cluster - std_dev = np.std(cluster_samples, axis=1) - - # If any dimension has std dev greater than the threshold, split the cluster - if np.any(std_dev > settings.max_std_dev): - # Add a new centroid - new_centroid_idx = settings.n_clusters - settings = settings._replace(n_clusters=settings.n_clusters + 1) - - # Find the dimension with the largest std dev - max_std_dim = np.argmax(std_dev) - - # Create two new centroids by moving along this dimension - new_centroids = np.column_stack(( - centroids, - centroids[:, i].copy() - )) - - # Adjust the centroids along the dimension with largest variance - new_centroids[max_std_dim, i] -= std_dev[max_std_dim] - new_centroids[max_std_dim, new_centroid_idx] += std_dev[max_std_dim] - - centroids = new_centroids - - # Reassign samples to the new centroids - diff1 = data - centroids[:, i].reshape(-1, 1) - diff2 = data - centroids[:, new_centroid_idx].reshape(-1, 1) - dist1 = np.sum(diff1**2, axis=0) - dist2 = np.sum(diff2**2, axis=0) - - # Assign to the closer of the two centroids - labels[np.logical_and(labels == i, dist2 < dist1)] = new_centroid_idx - - # Break to recalculate everything with the new number of clusters - break - - # 3. Merge clusters that are close to each other - if settings.n_clusters >= 2: - # Calculate distances between all pairs of centroids - centroid_distances = np.zeros((settings.n_clusters, settings.n_clusters)) - for i in range(settings.n_clusters): - for j in range(i+1, settings.n_clusters): - centroid_distances[i, j] = np.sqrt(np.sum((centroids[:, i] - centroids[:, j])**2)) - centroid_distances[j, i] = centroid_distances[i, j] - - # Find pairs of clusters to merge (closest pairs first) - merge_candidates = [] - for i in range(settings.n_clusters): - for j in range(i+1, settings.n_clusters): - if centroid_distances[i, j] < settings.min_cluster_distance: - merge_candidates.append((i, j, centroid_distances[i, j])) - - # Sort by distance (closest first) - merge_candidates.sort(key=lambda x: x[2]) - - # Merge up to max_merge_pairs pairs - merged_clusters = set() - for i, j, _ in merge_candidates[:settings.max_merge_pairs]: - if i in merged_clusters or j in merged_clusters: - continue - - # Merge clusters i and j - # Calculate the weighted average of the centroids - ni = np.sum(labels == i) - nj = np.sum(labels == j) - - if ni == 0 or nj == 0: - continue - - new_centroid = (ni * centroids[:, i] + nj * centroids[:, j]) / (ni + nj) - - # Update centroid i with the merged centroid - centroids[:, i] = new_centroid - - # Reassign samples from cluster j to cluster i - labels[labels == j] = i - - # Mark cluster j as merged - merged_clusters.add(j) - - # Remove merged centroids - if merged_clusters: - # Convert to list and sort in descending order to avoid index issues - merged_list = sorted(list(merged_clusters), reverse=True) - for idx in merged_list: - centroids = np.delete(centroids, idx, axis=1) - # Update labels to reflect the removed centroid - for old_idx in range(idx, settings.n_clusters): - labels[labels == old_idx] = old_idx - 1 - - # Update the number of clusters - settings = settings._replace(n_clusters=settings.n_clusters - len(merged_clusters)) - - # At the end of iteration - logger.info(f"End of iteration {iteration + 1}: {settings.n_clusters} clusters") - - # Check for convergence only if the number of clusters hasn't changed - if old_n_clusters == settings.n_clusters: - if np.allclose(old_centroids[:, :settings.n_clusters], centroids): - logger.info(f"Converged at iteration {iteration + 1}") - break - # If number of clusters changed, continue to next iteration - else: - logger.info(f"Number of clusters changed from {old_n_clusters} to {settings.n_clusters}") - continue - - # Ensure labels are consecutive integers starting from 0 - unique_labels = np.unique(labels) - label_map = {old: new for new, old in enumerate(unique_labels)} - new_labels = np.array([label_map[l] for l in labels]) - logger.info(f"=== ISODATA completed with {len(unique_labels)} final clusters ===") - - return new_labels - - def generate_distinct_colors(self, n_colors: int) -> List[QColor]: - """ - Generate a list of perceptually distinct colors. - - This method creates a list of colors that are visually distinct from each other, - suitable for visualizing different clusters. It uses a combination of predefined - color palettes for small numbers of clusters and algorithmic generation for larger numbers. - - Args: - n_colors (int): Number of distinct colors to generate. - - Returns: - List[QColor]: List of QColor objects representing distinct colors. - """ - # For small numbers of clusters, use a predefined set of distinct colors - # These colors are chosen to be visually distinct and colorblind-friendly - predefined_colors = [ - QColor(230, 25, 75), # Red - QColor(60, 180, 75), # Green - QColor(255, 225, 25), # Yellow - QColor(0, 130, 200), # Blue - QColor(245, 130, 48), # Orange - QColor(145, 30, 180), # Purple - QColor(70, 240, 240), # Cyan - QColor(240, 50, 230), # Magenta - QColor(210, 245, 60), # Lime - QColor(250, 190, 212), # Pink - QColor(0, 128, 128), # Teal - QColor(220, 190, 255), # Lavender - QColor(170, 110, 40), # Brown - QColor(255, 250, 200), # Beige - QColor(128, 0, 0), # Maroon - QColor(170, 255, 195), # Mint - QColor(128, 128, 0), # Olive - QColor(255, 215, 180), # Coral - QColor(0, 0, 128), # Navy - QColor(128, 128, 128), # Grey - ] - - if n_colors <= len(predefined_colors): - return predefined_colors[:n_colors] - - # For larger numbers, use HSV color space with golden ratio to distribute hues - colors = predefined_colors.copy() - - # Add more colors using the golden ratio method for hue distribution - golden_ratio_conjugate = 0.618033988749895 # 1 / phi - h = 0.1 # Starting hue - s = 0.8 # Saturation - v = 0.95 # Value - - while len(colors) < n_colors: - h = (h + golden_ratio_conjugate) % 1.0 - # Vary saturation and value slightly for better distinction - s_variation = 0.7 + (len(colors) % 3) * 0.1 - v_variation = 0.85 + (len(colors) % 2) * 0.1 - - # Convert to RGB and create QColor - h_degrees = h * 360.0 - color = QColor.fromHsv(int(h_degrees), int(s_variation * 255), int(v_variation * 255)) - colors.append(color) - - return colors - - def generate_masks(self, labels: NDArray[int], n_clusters: int) -> Dict[int, Tuple[NDArray[bool], QColor]]: - """ - Generate binary masks and assign a unique color for each cluster. - - This method creates a binary mask for each unique cluster label and assigns - a visually distinct color to each cluster using a perceptually-based approach. - Args: - labels (NDArray[int]): Array of cluster labels for each pixel. - n_clusters (int): Number of clusters (may be different from actual unique labels). - - Returns: - Dict[int, Tuple[NDArray[bool], QColor]]: Dictionary mapping cluster IDs to - tuples of (binary mask, color). - """ - masks = {} - unique_labels = np.unique(labels) - - # Generate distinct colors for all unique labels - colors = self.generate_distinct_colors(len(unique_labels)) - for idx, cluster_id in enumerate(unique_labels): - # Create binary mask for this cluster (True where label matches cluster_id) - mask = (labels == cluster_id) - # Assign a distinct color from our generated palette - color = colors[idx] - masks[cluster_id] = (mask, color) - return masks def cluster_on_mask(self, mask: NDArray[bool], n_clusters: int) -> Tuple[NDArray[int], KMeansSettings]: """ @@ -926,7 +490,7 @@ def cluster_on_mask(self, mask: NDArray[bool], n_clusters: int) -> Tuple[NDArray # Update masks with the new labels self.labels = new_labels - self.masks = self.generate_masks(self.labels, len(np.unique(new_labels))) + self.masks = mask_module.generate_masks(self.labels, len(np.unique(new_labels))) # Save state to undo stack self.undo_stack.append((np.copy(self.labels), {k: (mask.copy(), color) for k, (mask, color) in self.masks.items()})) @@ -995,7 +559,7 @@ def isodata_on_mask(self, mask: NDArray[bool], settings: ISODATASettings) -> Tup # Run ISODATA on averaged masked pixels using the algorithm method logger.info(f"Starting ISODATA algorithm with {settings.n_clusters} initial clusters") - sub_labels = self._isodata_algorithm(avg_masked_pixels, settings) + sub_labels = isodata_algorithm(avg_masked_pixels, settings) if sub_labels is None: logger.warning("ISODATA algorithm returned None") @@ -1014,7 +578,7 @@ def isodata_on_mask(self, mask: NDArray[bool], settings: ISODATASettings) -> Tup # Update masks with the new labels self.labels = new_labels - self.masks = self.generate_masks(self.labels, len(np.unique(new_labels))) + self.masks = mask_module.generate_masks(self.labels, len(np.unique(new_labels))) logger.info(f"Generated {len(self.masks)} masks") # Save state to undo stack @@ -1070,7 +634,7 @@ def merge_clusters(self, cluster_ids: List[int]) -> Tuple[NDArray[int], KMeansSe # Update labels and masks self.labels = new_labels - self.masks = self.generate_masks(self.labels, len(np.unique(new_labels))) + self.masks = mask_module.generate_masks(self.labels, len(np.unique(new_labels))) # Save state to undo stack self.undo_stack.append((np.copy(self.labels), {k: (mask.copy(), color) for k, (mask, color) in self.masks.items()})) @@ -1137,154 +701,14 @@ def create_label_image(self, labels: NDArray[int], num_labels: int) -> Image.Ima """ return cmp_viewer.utils.numpy_labels_to_pillow_image(labels) - def create_mask_overlay(self, mask: NDArray[bool], color: QColor, opacity: int, - target_width: int = None, target_height: int = None) -> QImage: - """ - Create a QImage overlay for a cluster mask with specified color and opacity. - - Args: - mask (NDArray[bool]): Boolean mask for the cluster - color (QColor): Color to use for the mask - opacity (int): Opacity value (0-255) - target_width (int, optional): Target width for resizing - target_height (int, optional): Target height for resizing - - Returns: - QImage: Transparent overlay with the mask colored - """ - height, width = mask.shape - - # Resize mask if target dimensions are provided - if target_width is not None and target_height is not None: - mask_small = cv2.resize(mask.astype(np.uint8), (target_width, target_height), - interpolation=cv2.INTER_NEAREST).astype(bool) - width, height = target_width, target_height - else: - mask_small = mask - - # Create transparent overlay - overlay = QImage(width, height, QImage.Format_ARGB32) - overlay.fill(Qt.transparent) - - # Apply color to mask - # Note: QImage.Format_ARGB32 expects BGRA byte order in memory on little-endian systems. - mask_data = np.zeros((height, width, 4), dtype=np.uint8) - mask_data[mask_small, 0] = color.blue() # B - mask_data[mask_small, 1] = color.green() # G - mask_data[mask_small, 2] = color.red() # R - mask_data[mask_small, 3] = opacity # A - - # Convert to QImage - overlay_data = mask_data.tobytes() - overlay = QImage(overlay_data, width, height, QImage.Format_ARGB32) - - return overlay - - def export_cluster_mask(self, cluster_id: int, output_path: str, file_format: str = "tiff"): - """ - Export a single cluster mask to a file. - - Args: - cluster_id (int): ID of the cluster to export - output_path (str): Path to save the mask - file_format (str, optional): File format (tiff, png, etc.) - - Returns: - bool: True if export was successful, False otherwise - """ - if self.masks is None or cluster_id not in self.masks: - return False - - mask, _ = self.masks[cluster_id] - if mask is None: - return False - - # Convert boolean mask to uint8 (0 or 255) - mask_array = mask.astype(np.uint8) * 255 - mask_image = Image.fromarray(mask_array, mode='L') - - # Ensure output path has correct extension - if not output_path.lower().endswith(f".{file_format.lower()}"): - output_path = f"{output_path}.{file_format.lower()}" - - mask_image.save(output_path) - return True - - def calculate_optimal_scale_factor(self, height: int, width: int, max_pixels: int = 500000) -> float: - """ - Calculate optimal scale factor to resize an image to a maximum number of pixels. - - Args: - height (int): Original height - width (int): Original width - max_pixels (int, optional): Maximum number of pixels in the resized image - - Returns: - float: Scale factor to apply - """ - if height * width <= max_pixels: - return 1.0 - - return np.sqrt(max_pixels / (height * width)) - def create_color_table(self, num_labels: int) -> List[int]: - """ - Create a color table for visualizing cluster labels. - - This method generates a list of perceptually distinct colors for visualizing - cluster labels. It uses the same color generation approach as generate_masks - to ensure consistency across different visualization methods. - - Args: - num_labels (int): Number of unique labels - Returns: - List[int]: List of RGB values as integers - """ - # Generate distinct colors using our perceptual color generation method - colors = self.generate_distinct_colors(num_labels) - # Convert QColors to qRgb integers - return [qRgb(color.red(), color.green(), color.blue()) for color in colors] - def create_palette_from_color_table(self, color_table: List[int]) -> List[int]: - """ - Create a palette from a color table for use with PIL images. - Args: - color_table (List[int]): List of RGB values as integers - Returns: - List[int]: Flattened list of RGB values for PIL palette - """ - palette = [] - for rgb in color_table: - r = (rgb >> 16) & 0xFF - g = (rgb >> 8) & 0xFF - b = rgb & 0xFF - palette.extend([r, g, b]) - return palette - - def prepare_label_image_for_display(self, img: Image.Image, num_labels: int) -> Tuple[Image.Image, List[int]]: - """ - Prepare a label image for display by setting its palette. - Args: - img (Image.Image): PIL Image with label data - num_labels (int): Number of unique labels - Returns: - Tuple[Image.Image, List[int]]: Tuple containing the prepared image and color table - """ - # Create color table and palette - color_table = self.create_color_table(num_labels) - palette = self.create_palette_from_color_table(color_table) - # Convert image to palette mode if needed - if img.mode != 'P': - img = img.convert('P') - # Set the palette - img.putpalette(palette) - return img, color_table diff --git a/cmp_viewer/clustering_algorithms.py b/cmp_viewer/clustering_algorithms.py new file mode 100644 index 0000000..e4a5161 --- /dev/null +++ b/cmp_viewer/clustering_algorithms.py @@ -0,0 +1,318 @@ +""" +Clustering algorithm implementations (K-means, ISODATA). +""" +import typing +import threading +import numpy as np +from numpy.typing import NDArray +from PyQt5.QtCore import QObject, pyqtSignal +from sklearn.cluster import KMeans +import logging +from cmp_viewer.utils import ISODATASettings, KMeansSettings + +logger = logging.getLogger(__name__) + + +class ClusteringWorker(QObject): + progress = pyqtSignal(int, str) + finished = pyqtSignal(object, object) # labels (ndarray), settings + error = pyqtSignal(str) + canceled = pyqtSignal() + + def __init__(self, *, algorithm: str, data: np.ndarray, settings: typing.Any, image_shape: typing.Tuple[int, int], isodata_fn: typing.Callable = None): + super().__init__() + self.algorithm = algorithm + self.data = data + self.settings = settings + self.image_shape = image_shape + self.isodata_fn = isodata_fn + self._cancel_event = threading.Event() + + def request_cancel(self): + self._cancel_event.set() + + def run(self): + try: + if self.algorithm == 'kmeans': + self.progress.emit(0, 'Initializing k-means...') + # sklearn expects samples as rows; upstream provides pixels as (n_selected_images, n_pixels) + pixels = self.data + km = KMeans(n_clusters=self.settings.n_clusters, + init=self.settings.init, + n_init=self.settings.n_init, + max_iter=self.settings.max_iter, + tol=self.settings.tol, + random_state=self.settings.random_state) + # Indeterminate: cannot report inner progress + labels = None + km.fit(pixels.T) + if self._cancel_event.is_set(): + self.canceled.emit() + return + labels = km.labels_.reshape(self.image_shape) + self.progress.emit(100, 'k-means complete') + self.finished.emit(labels, self.settings) + elif self.algorithm == 'isodata': + if self.isodata_fn is None: + raise RuntimeError('ISODATA function not provided') + def cb(pct: int, msg: str): + self.progress.emit(pct, msg) + labels_flat = self.isodata_fn(self.data, self.settings, progress_cb=cb, cancel_event=self._cancel_event) + if labels_flat is None: + # Treat None as canceled + self.canceled.emit() + return + labels = labels_flat.reshape(self.image_shape) + self.progress.emit(100, 'ISODATA complete') + self.finished.emit(labels, self.settings) + else: + raise RuntimeError(f'Unknown algorithm: {self.algorithm}') + except Exception as e: + # If cancellation expressed via exception, map to canceled + msg = str(e) + if 'CANCELED' in msg.upper() or isinstance(e, KeyboardInterrupt): + self.canceled.emit() + else: + self.error.emit(msg) + +def isodata_algorithm(data: NDArray, settings: ISODATASettings, progress_cb: typing.Callable[[int, str], None] = None, cancel_event: threading.Event = None) -> NDArray[int]: + + """ + Implement the ISODATA clustering algorithm with optional progress and cancellation support. + + ISODATA (Iterative Self-Organizing Data Analysis Technique) is an extension + of k-means that allows for merging and splitting of clusters based on various criteria. + + Args: + data (NDArray): Data to cluster, shape (n_features, n_samples) + settings (ISODATASettings): Settings for the ISODATA algorithm + + Returns: + NDArray[int]: Cluster labels for each sample + """ + logger.info(f"=== _isodata_algorithm started ===") + logger.info(f"Data shape: {data.shape} (features x samples)") + logger.info(f"Initial clusters: {settings.n_clusters}") + + np.random.seed(settings.random_state) + n_samples = data.shape[1] + n_features = data.shape[0] + + # Adjust number of clusters if it exceeds number of samples + settings = settings._replace(n_clusters=min(settings.n_clusters, n_samples)) + logger.info(f"Adjusted clusters to {settings.n_clusters} (cannot exceed {n_samples} samples)") + + # Initialize centroids randomly + # Select k random samples as initial centroids + indices = np.random.choice(n_samples, settings.n_clusters, replace=False) + centroids = data[:, indices] + logger.info(f"Initialized {settings.n_clusters} centroids") + + # Initialize labels + labels = np.zeros(n_samples, dtype=int) + + for iteration in range(settings.max_iter): + logger.info(f"--- Iteration {iteration + 1}/{settings.max_iter} ---") + + # Progress and cancel checks + if cancel_event is not None and cancel_event.is_set(): + logger.info("Algorithm canceled by user") + return None + if progress_cb is not None: + pct = int((iteration / max(1, settings.max_iter)) * 100) + progress_cb(pct, f"ISODATA: Iteration {iteration+1}/{settings.max_iter}") + # Store current number of clusters before any modifications + old_n_clusters = settings.n_clusters + + # Assign samples to closest centroids (like k-means) + logger.debug(f"Assigning samples to {settings.n_clusters} centroids...") + distances = np.zeros((settings.n_clusters, n_samples)) + for i in range(settings.n_clusters): + diff = data - centroids[:, i].reshape(-1, 1) + distances[i] = np.sum(diff**2, axis=0) + + # Assign each sample to the closest centroid + labels = np.argmin(distances, axis=0) + unique_labels_assigned = len(np.unique(labels)) + logger.info(f"Assigned samples to {unique_labels_assigned} unique clusters") + + # Make a copy of the current centroids for convergence check + old_centroids = centroids.copy() + + # Update centroids based on new assignments + logger.debug("Updating centroids...") + for i in range(settings.n_clusters): + cluster_samples = data[:, labels == i] + if cluster_samples.shape[1] > 0: + centroids[:, i] = np.mean(cluster_samples, axis=1) + + # Check for empty clusters and handle them + empty_clusters = [i for i in range(settings.n_clusters) if np.sum(labels == i) == 0] + if empty_clusters: + logger.info(f"Handling {len(empty_clusters)} empty clusters") + + for i in range(settings.n_clusters): + if np.sum(labels == i) == 0: + # Find the cluster with the most samples + largest_cluster = np.argmax([np.sum(labels == j) for j in range(settings.n_clusters)]) + # Find the samples furthest from the centroid in the largest cluster + cluster_samples = data[:, labels == largest_cluster] + if cluster_samples.shape[1] > 0: + diff = cluster_samples - centroids[:, largest_cluster].reshape(-1, 1) + distances = np.sum(diff ** 2, axis=0) + furthest_sample_idx = np.argmax(distances) + # Set the empty cluster's centroid to this sample + centroids[:, i] = cluster_samples[:, furthest_sample_idx] + # Reassign some samples to this new centroid + diff = data - centroids[:, i].reshape(-1, 1) + new_distances = np.sum(diff ** 2, axis=0) + closest_to_new = np.argsort(new_distances)[:settings.min_samples] + labels[closest_to_new] = i + + # ISODATA specific steps: + logger.debug("Applying ISODATA operations...") + + # 1. Discard clusters with too few samples + for i in range(settings.n_clusters): + if np.sum(labels == i) < settings.min_samples: + # Reassign samples from small clusters to the closest remaining cluster + small_cluster_samples = np.where(labels == i)[0] + for sample_idx in small_cluster_samples: + # Find the next closest centroid + sample = data[:, sample_idx] + distances = np.array([np.sum((sample - centroids[:, j])**2) for j in range(settings.n_clusters) if j != i]) + closest_centroid = np.argmin(distances) + # Adjust for the removed index + if closest_centroid >= i: + closest_centroid += 1 + labels[sample_idx] = closest_centroid + + # Remove the centroid + centroids = np.delete(centroids, i, axis=1) + + # Update labels to reflect the removed centroid + labels[labels > i] -= 1 + + # Adjust the number of clusters + settings = settings._replace(n_clusters=settings.n_clusters - 1) + + # Break to recalculate everything with the new number of clusters + break + + # 2. Split clusters with large standard deviation + for i in range(settings.n_clusters): + cluster_samples = data[:, labels == i] + if cluster_samples.shape[1] > 2 * settings.min_samples: + # Calculate standard deviation of the cluster + std_dev = np.std(cluster_samples, axis=1) + + # If any dimension has std dev greater than the threshold, split the cluster + if np.any(std_dev > settings.max_std_dev): + # Add a new centroid + new_centroid_idx = settings.n_clusters + settings = settings._replace(n_clusters=settings.n_clusters + 1) + + # Find the dimension with the largest std dev + max_std_dim = np.argmax(std_dev) + + # Create two new centroids by moving along this dimension + new_centroids = np.column_stack(( + centroids, + centroids[:, i].copy() + )) + + # Adjust the centroids along the dimension with largest variance + new_centroids[max_std_dim, i] -= std_dev[max_std_dim] + new_centroids[max_std_dim, new_centroid_idx] += std_dev[max_std_dim] + + centroids = new_centroids + + # Reassign samples to the new centroids + diff1 = data - centroids[:, i].reshape(-1, 1) + diff2 = data - centroids[:, new_centroid_idx].reshape(-1, 1) + dist1 = np.sum(diff1**2, axis=0) + dist2 = np.sum(diff2**2, axis=0) + + # Assign to the closer of the two centroids + labels[np.logical_and(labels == i, dist2 < dist1)] = new_centroid_idx + + # Break to recalculate everything with the new number of clusters + break + + # 3. Merge clusters that are close to each other + if settings.n_clusters >= 2: + # Calculate distances between all pairs of centroids + centroid_distances = np.zeros((settings.n_clusters, settings.n_clusters)) + for i in range(settings.n_clusters): + for j in range(i+1, settings.n_clusters): + centroid_distances[i, j] = np.sqrt(np.sum((centroids[:, i] - centroids[:, j])**2)) + centroid_distances[j, i] = centroid_distances[i, j] + + # Find pairs of clusters to merge (closest pairs first) + merge_candidates = [] + for i in range(settings.n_clusters): + for j in range(i+1, settings.n_clusters): + if centroid_distances[i, j] < settings.min_cluster_distance: + merge_candidates.append((i, j, centroid_distances[i, j])) + + # Sort by distance (closest first) + merge_candidates.sort(key=lambda x: x[2]) + + # Merge up to max_merge_pairs pairs + merged_clusters = set() + for i, j, _ in merge_candidates[:settings.max_merge_pairs]: + if i in merged_clusters or j in merged_clusters: + continue + + # Merge clusters i and j + # Calculate the weighted average of the centroids + ni = np.sum(labels == i) + nj = np.sum(labels == j) + + if ni == 0 or nj == 0: + continue + + new_centroid = (ni * centroids[:, i] + nj * centroids[:, j]) / (ni + nj) + + # Update centroid i with the merged centroid + centroids[:, i] = new_centroid + + # Reassign samples from cluster j to cluster i + labels[labels == j] = i + + # Mark cluster j as merged + merged_clusters.add(j) + + # Remove merged centroids + if merged_clusters: + # Convert to list and sort in descending order to avoid index issues + merged_list = sorted(list(merged_clusters), reverse=True) + for idx in merged_list: + centroids = np.delete(centroids, idx, axis=1) + # Update labels to reflect the removed centroid + for old_idx in range(idx, settings.n_clusters): + labels[labels == old_idx] = old_idx - 1 + + # Update the number of clusters + settings = settings._replace(n_clusters=settings.n_clusters - len(merged_clusters)) + + # At the end of iteration + logger.info(f"End of iteration {iteration + 1}: {settings.n_clusters} clusters") + + # Check for convergence only if the number of clusters hasn't changed + if old_n_clusters == settings.n_clusters: + if np.allclose(old_centroids[:, :settings.n_clusters], centroids): + logger.info(f"Converged at iteration {iteration + 1}") + break + # If number of clusters changed, continue to next iteration + else: + logger.info(f"Number of clusters changed from {old_n_clusters} to {settings.n_clusters}") + continue + + # Ensure labels are consecutive integers starting from 0 + unique_labels = np.unique(labels) + label_map = {old: new for new, old in enumerate(unique_labels)} + new_labels = np.array([label_map[l] for l in labels]) + logger.info(f"=== ISODATA completed with {len(unique_labels)} final clusters ===") + + return new_labels \ No newline at end of file diff --git a/cmp_viewer/clusterImgSelect.py b/cmp_viewer/dialogs.py similarity index 97% rename from cmp_viewer/clusterImgSelect.py rename to cmp_viewer/dialogs.py index 18aec42..5df51e9 100644 --- a/cmp_viewer/clusterImgSelect.py +++ b/cmp_viewer/dialogs.py @@ -1,15 +1,10 @@ -# import the necessary packages -from PyQt5.QtWidgets import QWidget -from PyQt5.QtCore import Qt -from PyQt5.QtWidgets import QPushButton -from PyQt5.QtWidgets import QVBoxLayout -from PyQt5.QtWidgets import QListWidget -from PyQt5 import QtWidgets -from PyQt5.QtCore import pyqtSignal import numpy as np -from numpy.typing import NDArray +from PyQt5 import QtWidgets +from PyQt5.QtCore import Qt from PyQt5.QtGui import QColor -from cmp_viewer.Cluster import Cluster +from PyQt5.QtWidgets import QVBoxLayout, QListWidget, QPushButton, QWidget + +from cmp_viewer.cluster_widget import Cluster from cmp_viewer.models import ImageSet """ diff --git a/cmp_viewer/mask.py b/cmp_viewer/mask.py new file mode 100644 index 0000000..96c61d4 --- /dev/null +++ b/cmp_viewer/mask.py @@ -0,0 +1,122 @@ +import os +import re +import csv +import cv2 +import numpy as np +from numpy.typing import NDArray +from typing import Dict, Tuple +from PyQt5.QtGui import QImage, QColor +from PyQt5.QtCore import Qt +from PIL import Image +from cmp_viewer import utils + +""" +Mask management, overlay creation, and export functions. +""" + + +def generate_masks(labels: NDArray[int], n_clusters: int) -> Dict[int, Tuple[NDArray[bool], QColor]]: + """ + Generate binary masks and assign a unique color for each cluster. + + This method creates a binary mask for each unique cluster label and assigns + a visually distinct color to each cluster using a perceptually-based approach. + + Args: + labels (NDArray[int]): Array of cluster labels for each pixel. + n_clusters (int): Number of clusters (may be different from actual unique labels). + + Returns: + Dict[int, Tuple[NDArray[bool], QColor]]: Dictionary mapping cluster IDs to + tuples of (binary mask, color). + """ + masks = {} + unique_labels = np.unique(labels) + + # Generate distinct colors for all unique labels + colors = utils.generate_distinct_colors(len(unique_labels)) + + for idx, cluster_id in enumerate(unique_labels): + # Create binary mask for this cluster (True where label matches cluster_id) + mask = (labels == cluster_id) + + # Assign a distinct color from our generated palette + color = colors[idx] + + masks[cluster_id] = (mask, color) + return masks + + +def create_mask_overlay(mask: NDArray[bool], color: QColor, opacity: int, + target_width: int = None, target_height: int = None) -> QImage: + """ + Create a QImage overlay for a cluster mask with specified color and opacity. + + Args: + mask (NDArray[bool]): Boolean mask for the cluster + color (QColor): Color to use for the mask + opacity (int): Opacity value (0-255) + target_width (int, optional): Target width for resizing + target_height (int, optional): Target height for resizing + + Returns: + QImage: Transparent overlay with the mask colored + """ + height, width = mask.shape + + # Resize mask if target dimensions are provided + if target_width is not None and target_height is not None: + mask_small = cv2.resize(mask.astype(np.uint8), (target_width, target_height), + interpolation=cv2.INTER_NEAREST).astype(bool) + width, height = target_width, target_height + else: + mask_small = mask + + # Create transparent overlay + overlay = QImage(width, height, QImage.Format_ARGB32) + overlay.fill(Qt.transparent) + + # Apply color to mask + # Note: QImage.Format_ARGB32 expects BGRA byte order in memory on little-endian systems. + mask_data = np.zeros((height, width, 4), dtype=np.uint8) + mask_data[mask_small, 0] = color.blue() # B + mask_data[mask_small, 1] = color.green() # G + mask_data[mask_small, 2] = color.red() # R + mask_data[mask_small, 3] = opacity # A + + # Convert to QImage + overlay_data = mask_data.tobytes() + overlay = QImage(overlay_data, width, height, QImage.Format_ARGB32) + + return overlay + + +def export_cluster_mask(cluster_id: int, output_path: str, file_format: str = "tiff"): + """ + Export a single cluster mask to a file. + + Args: + cluster_id (int): ID of the cluster to export + output_path (str): Path to save the mask + file_format (str, optional): File format (tiff, png, etc.) + + Returns: + bool: True if export was successful, False otherwise + """ + if self.masks is None or cluster_id not in self.masks: + return False + + mask, _ = self.masks[cluster_id] + if mask is None: + return False + + # Convert boolean mask to uint8 (0 or 255) + mask_array = mask.astype(np.uint8) * 255 + mask_image = Image.fromarray(mask_array, mode='L') + + # Ensure output path has correct extension + if not output_path.lower().endswith(f".{file_format.lower()}"): + output_path = f"{output_path}.{file_format.lower()}" + + mask_image.save(output_path) + return True \ No newline at end of file diff --git a/cmp_viewer/utils.py b/cmp_viewer/utils.py index 4075131..d235258 100644 --- a/cmp_viewer/utils.py +++ b/cmp_viewer/utils.py @@ -1,6 +1,8 @@ import typing +from typing import List, Tuple, NamedTuple +from PyQt5.QtGui import QColor, qRgb from PIL import Image -import numpy +import numpy as np from numpy.typing import NDArray """ @@ -31,5 +33,198 @@ def numpy_labels_to_pillow_image(input: NDArray[int]) -> Image: """ # Create a new image with the same size as the original image output_img = Image.new('P', (input.shape[1], input.shape[0])) - output_img.putdata(numpy.array(input.flat)) + output_img.putdata(np.array(input.flat)) return output_img + + +class KMeansSettings(typing.NamedTuple): + """ + A named tuple for storing K-means clustering parameters. + + Attributes: + n_clusters (int): Number of clusters to form. + init (str): Method for initialization ('random', 'k-means++', etc.). + n_init (int): Number of times the k-means algorithm will be run with different seeds. + max_iter (int): Maximum number of iterations for a single run. + tol (float): Relative tolerance for convergence. + random_state (int): Seed for random number generation for reproducibility. + """ + n_clusters: int + init: str + n_init: int + max_iter: int + tol: float + random_state: int + + +class ISODATASettings(typing.NamedTuple): + """ + A named tuple for storing ISODATA clustering parameters. + + ISODATA (Iterative Self-Organizing Data Analysis Technique) is an extension + of k-means that allows for merging and splitting of clusters based on various criteria. + + Attributes: + n_clusters (int): Initial number of clusters to form. + max_iter (int): Maximum number of iterations. + min_samples (int): Minimum number of samples in a cluster. + max_std_dev (float): Maximum standard deviation within a cluster. + min_cluster_distance (float): Minimum distance between clusters for merging. + max_merge_pairs (int): Maximum number of cluster pairs to merge per iteration. + random_state (int): Seed for random number generation for reproducibility. + """ + n_clusters: int + max_iter: int + min_samples: int + max_std_dev: float + min_cluster_distance: float + max_merge_pairs: int + random_state: int + + +def generate_distinct_colors(n_colors: int) -> List[QColor]: + """ + Generate a list of perceptually distinct colors. + + This method creates a list of colors that are visually distinct from each other, + suitable for visualizing different clusters. It uses a combination of predefined + color palettes for small numbers of clusters and algorithmic generation for larger numbers. + + Args: + n_colors (int): Number of distinct colors to generate. + + Returns: + List[QColor]: List of QColor objects representing distinct colors. + """ + # For small numbers of clusters, use a predefined set of distinct colors + # These colors are chosen to be visually distinct and colorblind-friendly + predefined_colors = [ + QColor(230, 25, 75), # Red + QColor(60, 180, 75), # Green + QColor(255, 225, 25), # Yellow + QColor(0, 130, 200), # Blue + QColor(245, 130, 48), # Orange + QColor(145, 30, 180), # Purple + QColor(70, 240, 240), # Cyan + QColor(240, 50, 230), # Magenta + QColor(210, 245, 60), # Lime + QColor(250, 190, 212), # Pink + QColor(0, 128, 128), # Teal + QColor(220, 190, 255), # Lavender + QColor(170, 110, 40), # Brown + QColor(255, 250, 200), # Beige + QColor(128, 0, 0), # Maroon + QColor(170, 255, 195), # Mint + QColor(128, 128, 0), # Olive + QColor(255, 215, 180), # Coral + QColor(0, 0, 128), # Navy + QColor(128, 128, 128), # Grey + ] + + if n_colors <= len(predefined_colors): + return predefined_colors[:n_colors] + + # For larger numbers, use HSV color space with golden ratio to distribute hues + colors = predefined_colors.copy() + + # Add more colors using the golden ratio method for hue distribution + golden_ratio_conjugate = 0.618033988749895 # 1 / phi + h = 0.1 # Starting hue + s = 0.8 # Saturation + v = 0.95 # Value + + while len(colors) < n_colors: + h = (h + golden_ratio_conjugate) % 1.0 + # Vary saturation and value slightly for better distinction + s_variation = 0.7 + (len(colors) % 3) * 0.1 + v_variation = 0.85 + (len(colors) % 2) * 0.1 + + # Convert to RGB and create QColor + h_degrees = h * 360.0 + color = QColor.fromHsv(int(h_degrees), int(s_variation * 255), int(v_variation * 255)) + colors.append(color) + + return colors + + +def create_color_table(num_labels: int) -> List[int]: + """ + Create a color table for visualizing cluster labels. + + This method generates a list of perceptually distinct colors for visualizing + cluster labels. It uses the same color generation approach as generate_masks + to ensure consistency across different visualization methods. + + Args: + num_labels (int): Number of unique labels + + Returns: + List[int]: List of RGB values as integers + """ + # Generate distinct colors using our perceptual color generation method + colors = generate_distinct_colors(num_labels) + + # Convert QColors to qRgb integers + return [qRgb(color.red(), color.green(), color.blue()) for color in colors] + + +def create_palette_from_color_table(color_table: List[int]) -> List[int]: + """ + Create a palette from a color table for use with PIL images. + + Args: + color_table (List[int]): List of RGB values as integers + + Returns: + List[int]: Flattened list of RGB values for PIL palette + """ + palette = [] + for rgb in color_table: + r = (rgb >> 16) & 0xFF + g = (rgb >> 8) & 0xFF + b = rgb & 0xFF + palette.extend([r, g, b]) + return palette + + +def prepare_label_image_for_display(img: Image.Image, num_labels: int) -> Tuple[Image.Image, List[int]]: + """ + Prepare a label image for display by setting its palette. + + Args: + img (Image.Image): PIL Image with label data + num_labels (int): Number of unique labels + + Returns: + Tuple[Image.Image, List[int]]: Tuple containing the prepared image and color table + """ + # Create color table and palette + color_table = create_color_table(num_labels) + palette = create_palette_from_color_table(color_table) + + # Convert image to palette mode if needed + if img.mode != 'P': + img = img.convert('P') + + # Set the palette + img.putpalette(palette) + + return img, color_table + + +def calculate_optimal_scale_factor(height: int, width: int, max_pixels: int = 500000) -> float: + """ + Calculate optimal scale factor to resize an image to a maximum number of pixels. + + Args: + height (int): Original height + width (int): Original width + max_pixels (int, optional): Maximum number of pixels in the resized image + + Returns: + float: Scale factor to apply + """ + if height * width <= max_pixels: + return 1.0 + + return np.sqrt(max_pixels / (height * width)) \ No newline at end of file From 01df9e9c50117b81ab7089feaeede1315c1f5643 Mon Sep 17 00:00:00 2001 From: JoyceJYW Date: Tue, 9 Dec 2025 15:34:57 -0500 Subject: [PATCH 3/4] Enable horizontal scroll bar --- cmp_viewer/ImageViewer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmp_viewer/ImageViewer.py b/cmp_viewer/ImageViewer.py index 3281eeb..e739877 100644 --- a/cmp_viewer/ImageViewer.py +++ b/cmp_viewer/ImageViewer.py @@ -100,7 +100,7 @@ def __init__(self, starting_images_folder=None): self.leftControlsScrollArea.setWidget(self.leftControlsWidget) self.leftControlsScrollArea.setWidgetResizable(True) self.leftControlsScrollArea.setFixedWidth(400) # Fixed width to prevent overlap - self.leftControlsScrollArea.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) + self.leftControlsScrollArea.setHorizontalScrollBarPolicy(Qt.ScrollBarAsNeeded) self.leftControlsScrollArea.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) # Add the left controls scroll area to the main layout From 9a160ddbb10ffeb97a4880e8b26c30995b142629 Mon Sep 17 00:00:00 2001 From: JoyceJYW Date: Thu, 11 Dec 2025 11:45:56 -0500 Subject: [PATCH 4/4] Fixed refactoring --- cmp_viewer/ImageViewer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmp_viewer/ImageViewer.py b/cmp_viewer/ImageViewer.py index e739877..b1f18cf 100644 --- a/cmp_viewer/ImageViewer.py +++ b/cmp_viewer/ImageViewer.py @@ -1447,7 +1447,7 @@ def show_label_image(self, img, num_labels: int): smask = any_entry[0] height, width = smask.shape if self.clusterview is not None: - scale_factor = self.clusterview.calculate_optimal_scale_factor(height, width) + scale_factor = utils.calculate_optimal_scale_factor(height, width) else: max_pixels = 500000 scale_factor = np.sqrt(max_pixels / (height * width)) if height * width > max_pixels else 1.0 @@ -1471,7 +1471,7 @@ def show_label_image(self, img, num_labels: int): continue # Create overlay if self.clusterview is not None: - overlay = self.clusterview.create_mask_overlay( + overlay = mask_module.create_mask_overlay( mask, color, self._mask_opacity, target_width=new_width, target_height=new_height )