diff --git a/circle.tif b/circle.tif
new file mode 100644
index 000000000..e51c2a029
Binary files /dev/null and b/circle.tif differ
diff --git a/imswitch/_data/images/calibration_back.svg b/imswitch/_data/images/calibration_back.svg
new file mode 100644
index 000000000..51a9c9208
--- /dev/null
+++ b/imswitch/_data/images/calibration_back.svg
@@ -0,0 +1,7330 @@
+
+
+
+
diff --git a/imswitch/_data/images/calibration_front.svg b/imswitch/_data/images/calibration_front.svg
new file mode 100644
index 000000000..f1ae5a38a
--- /dev/null
+++ b/imswitch/_data/images/calibration_front.svg
@@ -0,0 +1,13533 @@
+
+
+
+
diff --git a/imswitch/imcontrol/_test/api/test_stage_calibration.py b/imswitch/imcontrol/_test/api/test_stage_calibration.py
new file mode 100644
index 000000000..c47d08a33
--- /dev/null
+++ b/imswitch/imcontrol/_test/api/test_stage_calibration.py
@@ -0,0 +1,251 @@
+"""
+API tests for enhanced Stage Center Calibration functionality.
+Tests the new calibration target features including manual, automatic,
+maze navigation, stepsize and wellplate calibrations.
+"""
+import pytest
+import requests
+import time
+import json
+from typing import Dict, List, Any
+from ..api import api_server, base_url
+
+
+def test_stage_calibration_discovery(api_server):
+ """Test discovery of stage calibration endpoints."""
+ response = api_server.get("/openapi.json")
+ assert response.status_code == 200
+ spec = response.json()
+
+ # Find stage calibration related endpoints
+ paths = spec.get("paths", {})
+ calibration_endpoints = [p for p in paths.keys() if "StageCenterCalibration" in p]
+
+ if not calibration_endpoints:
+ pytest.skip("No stage calibration endpoints found in API")
+
+ print(f"Found {len(calibration_endpoints)} stage calibration endpoints")
+
+ # Test required endpoints are present
+ required_endpoints = [
+ "/StageCenterCalibrationController/setKnownPosition",
+ "/StageCenterCalibrationController/getCalibrationTargetInfo",
+ "/StageCenterCalibrationController/performAutomaticCalibration",
+ "/StageCenterCalibrationController/startMaze",
+ "/StageCenterCalibrationController/stopMaze",
+ "/StageCenterCalibrationController/performStepsizeCalibration",
+ "/StageCenterCalibrationController/perform384WellplateCalibration"
+ ]
+
+ for required in required_endpoints:
+ if required in calibration_endpoints:
+ print(f"✓ Required endpoint found: {required}")
+ else:
+ print(f"? Required endpoint missing: {required}")
+
+
+def test_calibration_target_info(api_server):
+ """Test calibration target information endpoint."""
+ endpoint = "/StageCenterCalibrationController/getCalibrationTargetInfo"
+
+ try:
+ response = api_server.get(endpoint)
+ if response.status_code == 200:
+ data = response.json()
+
+ # Check required fields
+ required_fields = [
+ "width_mm", "height_mm", "frontside_svg", "backside_svg",
+ "calibration_center", "maze_start", "stepsize_grid", "wellplate_start"
+ ]
+
+ for field in required_fields:
+ assert field in data, f"Missing required field: {field}"
+ print(f"✓ Found field: {field}")
+
+ # Validate SVG content
+ assert ".
\ No newline at end of file
diff --git a/imswitch/imcontrol/controller/controllers/HistoScanController.py b/imswitch/imcontrol/controller/controllers/HistoScanController.py
index d6e3960b9..d2791e202 100644
--- a/imswitch/imcontrol/controller/controllers/HistoScanController.py
+++ b/imswitch/imcontrol/controller/controllers/HistoScanController.py
@@ -494,7 +494,7 @@ def fetchStageMap(self, resizeFactor:float=1, mapID:int=0):
return Response(im_bytes, headers=headers, media_type='image/png')
@APIExport(runOnUIThread=False)
- def getSampleLayoutFilePaths(self):
+ def getSampleLayoutFilePaths(self) -> list:
# return the paths of the sample layouts
# images are provided via imswitchserver
_baseDataFilesDir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '_data')
diff --git a/imswitch/imcontrol/controller/controllers/StageCenterCalibrationController.py b/imswitch/imcontrol/controller/controllers/StageCenterCalibrationController.py
index 5559f8170..4ad46b5ac 100644
--- a/imswitch/imcontrol/controller/controllers/StageCenterCalibrationController.py
+++ b/imswitch/imcontrol/controller/controllers/StageCenterCalibrationController.py
@@ -5,6 +5,7 @@
import numpy as np
import tifffile as tif
+import cv2
from imswitch import IS_HEADLESS
from imswitch.imcommon.framework import Signal, Mutex
@@ -34,12 +35,38 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._logger = initLogger(self)
-
# state
self._task = None
self._is_running = False
self._positions: list[tuple[float, float]] = []
self._run_mutex = Mutex()
+
+ # Load calibration settings from setup config
+ calibration_config = getattr(self._setupInfo, 'stageCalibration', {})
+
+ # Calibration target constants (in mm) - configurable with defaults
+ self.CALIBRATION_CENTER_X = calibration_config.get('calibrationCenterX', 63.81)
+ self.CALIBRATION_CENTER_Y = calibration_config.get('calibrationCenterY', 42.06)
+ self.MAZE_START_X = calibration_config.get('mazeStartX', 9.5)
+ self.MAZE_START_Y = calibration_config.get('mazeStartY', 11.5)
+ self.STEPSIZE_GRID_X = calibration_config.get('stepsizeGridX', 105.0)
+ self.STEPSIZE_GRID_Y = calibration_config.get('stepsizeGridY', 16.0)
+ self.WELLPLATE_START_X = calibration_config.get('wellplateStartX', 12.2)
+ self.WELLPLATE_START_Y = calibration_config.get('wellplateStartY', 9.0)
+ self.WELLPLATE_SPACING = calibration_config.get('wellplateSpacing', 4.5) # mm
+ self.MAX_SPEED = calibration_config.get('maxSpeed', 20000) # µm/s
+
+ # Home position - now configurable instead of FRAME-specific
+ self.HOME_POS_X = calibration_config.get('homePosX', 0) * 1000 # convert mm to µm
+ self.HOME_POS_Y = calibration_config.get('homePosY', 87) * 1000 # convert mm to µm
+
+ # Calibration target dimensions (in mm) - configurable with defaults
+ self.TARGET_WIDTH = calibration_config.get('targetWidth', 127.76)
+ self.TARGET_HEIGHT = calibration_config.get('targetHeight', 85.48)
+
+ # Maze navigation state
+ self._maze_running = False
+ self._maze_positions = []
# ───────────────────────────── API ──────────────────────────────────────
def getDetector(self):
@@ -51,127 +78,798 @@ def getStage(self):
return self._master.positionersManager[stageName]
@APIExport()
- def performCalibration(
- self,
- start_x: float,
- start_y: float,
- exposure_time_us: int = 3000,
- speed: int = 5000,
- step_um: float = 50.0,
- max_radius_um: float = 2000.0,
- brightness_factor: float = 1.4,
- ) -> list[tuple[float, float]]:
- if self._is_running:
- return self._positions
+ def getIsCalibrationRunning(self):
+ return self._is_running
- self._is_running = True
- self._positions.clear()
+ @APIExport()
+ def getCalibrationStatus(self) -> dict:
+ """
+ Get the current status of any running calibration process.
+ """
+ return {
+ "is_running": self._is_running,
+ "positions_collected": len(self._positions),
+ "last_position": self._positions[-1] if self._positions else None
+ }
- try:
- self.getDetector().setExposure(exposure_time_us)
- except AttributeError:
- pass
-
- self._task = threading.Thread(
- target=self._worker,
- args=(
- start_x,
- start_y,
- speed,
- step_um,
- max_radius_um,
- brightness_factor,
- ),
- daemon=True,
- )
- self._task.start()
- self._task.join()
- return self._positions.copy()
+ @APIExport()
+ def stopCalibration(self) -> dict:
+ """
+ Stop any running calibration process.
+ """
+ if not self._is_running:
+ return {"status": "info", "message": "No calibration is currently running"}
+
+ self._is_running = False
+ self._logger.info("Calibration process stopped by user request")
+ return {"status": "stopped", "message": "Calibration process has been stopped"}
@APIExport()
- def getIsCalibrationRunning(self):
- return self._is_running
+ def setKnownPosition(self, x_mm: float = None, y_mm: float = None):
+ """
+ Manually set the stage offset to a known position.
+ If no coordinates provided, uses the calibration center (63.81, 42.06).
+ """
+ if x_mm is None:
+ x_mm = self.CALIBRATION_CENTER_X
+ if y_mm is None:
+ y_mm = self.CALIBRATION_CENTER_Y
+
+ stage = self.getStage()
+ current_pos = stage.getPosition()
+
+ # Convert current position from steps to physical coordinates (µm)
+ # The PositionerManager expects both positions in physical coordinates
+ current_pos_x_um = current_pos["X"] / stage.stepSizes["X"] if hasattr(stage, 'stepSizes') else current_pos["X"]
+ current_pos_y_um = current_pos["Y"] / stage.stepSizes["Y"] if hasattr(stage, 'stepSizes') else current_pos["Y"]
+
+ # Set stage offset for both axes (both in µm)
+ stage.setStageOffsetAxis(knownPosition=x_mm * 1000, currentPosition=current_pos_x_um, axis="X") # Convert mm to µm
+ stage.setStageOffsetAxis(knownPosition=y_mm * 1000, currentPosition=current_pos_y_um, axis="Y") # Convert mm to µm
+
+ self._logger.info(f"Stage offset set to known position: X={x_mm}mm, Y={y_mm}mm")
+ return {"status": "success", "x_mm": x_mm, "y_mm": y_mm}
+
+ @APIExport()
+ def findCalibrationCenter(
+ self,
+ unit_um: float = 500.0,
+ increment_units: int = 1,
+ start_len_units: int = 1,
+ min_x: float = None,
+ max_x: float = None,
+ min_y: float = None,
+ max_y: float = None,
+ intensity_factor: float = 1.5,
+ settle_s: float = 0.1,
+ max_legs: int = 50,
+ laser_name: str = "LED",
+ laser_intensity: float = 1023,
+ homing_procedure: bool = False,
+ speed_um_s: float = None
+ ) -> dict:
+ """
+ API export for spiral search calibration center finding.
+ Starts the search in a separate thread and returns immediately.
+
+ Args:
+ unit_um: Base grid step in µm
+ increment_units: Increase after every two legs
+ start_len_units: Starting leg length in units
+ min_x, max_x, min_y, max_y: Absolute stage limits in µm
+ intensity_factor: Stop when mean >= factor * baseline
+ settle_s: Dwell after each move
+ max_legs: Safety cap on spiral legs
+ speed_um_s: Movement speed in µm/s (defaults to MAX_SPEED)
+
+ Returns:
+ dict: Status information
+ """
+ if speed_um_s is None:
+ speed_um_s = self.MAX_SPEED
+
+ if self._is_running:
+ return {"status": "error", "message": "Another calibration is already running"}
+ self.findCalibrationCenterThread = threading.Thread(
+ target=self._findCalibrationCenter,
+ args=(unit_um, increment_units, start_len_units, min_x, max_x, min_y, max_y, intensity_factor, settle_s, max_legs, laser_name, laser_intensity, homing_procedure, speed_um_s),
+ daemon=True,
+ )
+ self.findCalibrationCenterThread.start()
+ return {"status": "started", "message": "Calibration center search started"}
- # ──────────────────────────── worker ────────────────────────────────────
- def _worker(self, cx, cy, speed, step_um, max_r, bf):
- self.getStage().move("X", cx, True, True)
- self.getStage().move("Y", cy, True, True)
+ @APIExport()
+ def getCalibrationTargetInfo(self) -> dict:
+ """
+ Returns information about the calibration target including SVG representation.
+ SVG files are served from disk via the ImSwitch server.
+ """
+ # Get SVG files from disk
+ svg_files = self._getCalibrationSVGFiles()
+
+ # Generate fallback SVG content if files are not found
+ frontside_svg = f'''
+
+ '''
+
+ backside_svg = f'''
+
+ '''
+
+ return {
+ "width_mm": self.TARGET_WIDTH,
+ "height_mm": self.TARGET_HEIGHT,
+ "svg_file_paths": svg_files, # Paths to SVG files served by ImSwitch server
+ #"frontside_svg": frontside_svg, # Fallback SVG content
+ #"backside_svg": backside_svg, # Fallback SVG content
+ "calibration_center": {"x": self.CALIBRATION_CENTER_X, "y": self.CALIBRATION_CENTER_Y},
+ "maze_start": {"x": self.MAZE_START_X, "y": self.MAZE_START_Y},
+ "stepsize_grid": {"x": self.STEPSIZE_GRID_X, "y": self.STEPSIZE_GRID_Y},
+ "wellplate_start": {"x": self.WELLPLATE_START_X, "y": self.WELLPLATE_START_Y, "spacing": self.WELLPLATE_SPACING}
+ }
- baseline = self._grabMeanFrame()
- if baseline is None:
- self._logger.error("No detector image – aborting")
+ @APIExport()
+ def stopFindCalibrationCenter(self) -> dict:
+ """
+ Stop the ongoing calibration center finding process.
+ """
+ if not self._is_running:
+ return {"status": "info", "message": "No calibration center search is currently running"}
+
+ self._is_running = False
+ self._logger.info("Calibration center search stopped by user request")
+ return {"status": "stopped", "message": "Calibration center search has been stopped"}
+
+ def _getCalibrationSVGFiles(self) -> dict:
+ """
+ Get paths to calibration SVG files from disk.
+ Returns dictionary with file paths that can be served via ImSwitch server.
+ """
+ try:
+ _baseDataFilesDir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '_data')
+ images_dir = os.path.join(_baseDataFilesDir, 'images')
+
+ svg_files = {
+ "frontside_svg_path": None,
+ "backside_svg_path": None,
+ "available_svg_files": []
+ }
+
+ # Check if images directory exists
+ if not os.path.exists(images_dir):
+ self._logger.warning(f"Images directory not found: {images_dir}")
+ return svg_files
+
+ # Find all SVG files in directory and subfolders
+ for root, dirs, files in os.walk(images_dir):
+ for file in files:
+ if file.lower().endswith('.svg'):
+ # Get relative path from _data directory for server serving
+ relative_path = os.path.join(root.split("_data/")[-1], file)
+ svg_files["available_svg_files"].append(relative_path)
+
+ # Check for specific calibration files
+ if 'calibration_front' in file.lower() or 'front' in file.lower():
+ svg_files["frontside_svg_path"] = relative_path
+ elif 'calibration_back' in file.lower() or 'back' in file.lower():
+ svg_files["backside_svg_path"] = relative_path
+
+ self._logger.info(f"Found {len(svg_files['available_svg_files'])} SVG files in {images_dir}")
+ return svg_files
+
+ except Exception as e:
+ self._logger.error(f"Failed to get SVG files: {e}")
+ return {
+ "frontside_svg_path": None,
+ "backside_svg_path": None,
+ "available_svg_files": []
+ }
+
+ @APIExport()
+ def startMaze(self, custom_path: list = None) -> dict:
+ """
+ Start maze navigation from position (9.5, 11.5) with 1000µm steps.
+ Saves images as TIFF stack during navigation.
+ """
+ if self._maze_running:
+ return {"status": "error", "message": "Maze already running"}
+
+ self._maze_running = True
+ self._maze_positions = []
+
+ # Default maze path (can be customized)
+ if custom_path is None:
+ custom_path = [
+ (0, 0), (1, 0), (2, 0), (2, 1), (2, 2), (1, 2), (0, 2), (0, 1), # Simple square path
+ (1, 1) # End at center
+ ]
+
+ try:
+ stage = self.getStage()
+
+ # Move to maze start position
+ start_x_um = self.MAZE_START_X * 1000 # Convert mm to µm
+ start_y_um = self.MAZE_START_Y * 1000
+ stage.move(axis="X", value=start_x_um, is_absolute=True, is_blocking=True)
+ stage.move(axis="Y", value=start_y_um, is_absolute=True, is_blocking=True)
+
+ # Start maze navigation in separate thread
+ self._task = threading.Thread(target=self._navigateMaze, args=(custom_path,), daemon=True)
+ self._task.start()
+
+ return {"status": "started", "path_length": len(custom_path)}
+
+ except Exception as e:
+ self._maze_running = False
+ self._logger.error(f"Failed to start maze navigation: {e}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport()
+ def stopMaze(self) -> dict:
+ """Stop maze navigation."""
+ self._maze_running = False
+ if self._task is not None:
+ self._task.join()
+ self._task = None
+ self._logger.info("Maze navigation stopped.")
+ return {"status": "stopped", "positions_visited": len(self._maze_positions)}
+
+ @APIExport()
+ def getMazeStatus(self) -> dict:
+ """Get current maze navigation status."""
+ return {
+ "running": self._maze_running,
+ "positions_visited": len(self._maze_positions),
+ "current_position": self._maze_positions[-1] if self._maze_positions else None
+ }
+
+ @APIExport()
+ def performStepsizeCalibration(self) -> dict:
+ """
+ Perform stepsize calibration using 7x7 hole lattice at (105, 16) with 1mm spacing.
+ Captures images at each hole position and saves as TIFF stack.
+ """
+ if self._is_running:
+ return {"status": "error", "message": "Another calibration is running"}
+
+ # Start calibration in separate thread
+ self._task = threading.Thread(target=self._performStepsizeCalibrationThread, daemon=True)
+ self._task.start()
+ return {"status": "started", "message": "Stepsize calibration started"}
+
+ def _performStepsizeCalibrationThread(self):
+ """
+ Thread implementation for stepsize calibration.
+ """
+ self._is_running = True
+ try:
+ self._performStepsizeCalibrationLogic()
+ except Exception as e:
+ self._logger.error(f"Stepsize calibration failed: {e}")
+ finally:
self._is_running = False
- return
+
+ def _performStepsizeCalibrationLogic(self):
+ """
+ Core logic for stepsize calibration.
+ """
+ stage = self.getStage()
+ detector = self.getDetector()
+
+ # Move to starting position
+ start_x_um = self.STEPSIZE_GRID_X * 1000 # Convert mm to µm
+ start_y_um = self.STEPSIZE_GRID_Y * 1000
+
+ images = []
+ positions = []
+
+ # Scan 7x7 grid
+ for i in range(7):
+ for j in range(7):
+ if not self._is_running:
+ break
+
+ # Calculate position (1mm = 1000µm spacing)
+ x_pos = start_x_um + (i * 1000)
+ y_pos = start_y_um + (j * 1000)
+
+ # Move to position
+ stage.move(axis="X", value=x_pos, is_absolute=True, is_blocking=True)
+ stage.move(axis="Y", value=y_pos, is_absolute=True, is_blocking=True)
+
+ # Capture image
+ time.sleep(0.1) # Allow settling
+ frame = detector.getLatestFrame()
+ if frame is not None:
+ images.append(frame)
+ positions.append((x_pos, y_pos))
+ self._logger.debug(f"Captured image at grid position ({i}, {j})")
+ if not self._is_running:
+ self._logger.info("Stepsize calibration stopped by user")
+ return
+
+ # Save TIFF stack
+ if images:
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ dir_path = os.path.join(os.path.expanduser("~"), "imswitch_calibrations", timestamp)
+ os.makedirs(dir_path, exist_ok=True)
+
+ stack_path = os.path.join(dir_path, "stepsize_calibration_stack.tiff")
+ tif.imwrite(stack_path, np.array(images))
+
+ positions_path = os.path.join(dir_path, "stepsize_positions.csv")
+ np.savetxt(positions_path, np.array(positions), delimiter=",", header="X(µm),Y(µm)")
+
+ self._logger.info(f"Stepsize calibration completed. {len(images)} images saved to {stack_path}")
+ else:
+ self._logger.warning("No images captured during stepsize calibration")
+
+ @APIExport()
+ def perform384WellplateCalibration(self, sample_wells: list = None) -> dict:
+ """
+ Perform 384 wellplate calibration on backside pattern.
+ Scans random positions of wells A1-P24 and compares center positions.
+ """
+ if self._is_running:
+ return {"status": "error", "message": "Another calibration is running"}
+
+ self._is_running = True
+ try:
+ stage = self.getStage()
+ detector = self.getDetector()
+
+ # Default sample wells if none provided
+ if sample_wells is None:
+ # Sample some wells across the plate
+ sample_wells = ["A1", "A12", "A24", "H1", "H12", "H24", "P1", "P12", "P24"]
+
+ images = []
+ positions = []
+ well_info = []
+
+ for well in sample_wells:
+ if not self._is_running:
+ break
+
+ # Parse well coordinate (e.g., "A1" -> row=0, col=0)
+ row = ord(well[0]) - ord('A') # A=0, B=1, ..., P=15
+ col = int(well[1:]) - 1 # 1=0, 2=1, ..., 24=23
+
+ if row > 15 or col > 23: # Validate well coordinates
+ self._logger.warning(f"Invalid well coordinate: {well}")
+ continue
+
+ # Calculate position
+ x_pos = (self.WELLPLATE_START_X + col * self.WELLPLATE_SPACING) * 1000 # Convert to µm
+ y_pos = (self.WELLPLATE_START_Y + row * self.WELLPLATE_SPACING) * 1000
+
+ # Move to position
+ stage.move(axis="X", value=x_pos, is_absolute=True, is_blocking=True)
+ stage.move(axis="Y", value=y_pos, is_absolute=True, is_blocking=True)
+
+ # Capture image
+ time.sleep(0.1) # Allow settling
+ frame = detector.getLatestFrame()
+ if frame is not None:
+ images.append(frame)
+ positions.append((x_pos, y_pos))
+ well_info.append({"well": well, "row": row, "col": col, "x": x_pos, "y": y_pos})
+ self._logger.debug(f"Captured image at well {well}")
+
+ # Save TIFF stack and data
+ if images:
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ dir_path = os.path.join(os.path.expanduser("~"), "imswitch_calibrations", timestamp)
+ os.makedirs(dir_path, exist_ok=True)
+
+ stack_path = os.path.join(dir_path, "wellplate_384_calibration_stack.tiff")
+ tif.imwrite(stack_path, np.array(images))
+
+ positions_path = os.path.join(dir_path, "wellplate_positions.csv")
+ np.savetxt(positions_path, np.array(positions), delimiter=",", header="X(µm),Y(µm)")
+
+ # Save well information as JSON
+ import json
+ wells_path = os.path.join(dir_path, "wellplate_wells.json")
+ with open(wells_path, 'w') as f:
+ json.dump(well_info, f, indent=2)
+
+ self._logger.info(f"384 wellplate calibration completed. {len(images)} wells scanned.")
+ return {
+ "status": "success",
+ "wells_scanned": len(images),
+ "tiff_stack_path": stack_path,
+ "positions_path": positions_path,
+ "wells_info_path": wells_path,
+ "wells": well_info
+ }
+ else:
+ return {"status": "error", "message": "No images captured"}
+
+ except Exception as e:
+ self._logger.error(f"384 wellplate calibration failed: {e}")
+ return {"status": "error", "message": str(e)}
+ finally:
+ self._is_running = False
+
+ # ─────────────────────── new calibration helpers ───────────────────────
+
+ def _findCalibrationCenter(
+ self,
+ unit_um: float = 1000.0, # base grid step in µm
+ increment_units: int = 1, # increase after every two legs (set to 2 for 1,1,3,3,...)
+ start_len_units: int = 1, # starting leg length in units
+ min_x: float | None = None, # absolute stage limits in µm
+ max_x: float | None = None,
+ min_y: float | None = None,
+ max_y: float | None = None,
+ intensity_factor: float = 1.5, # stop when mean >= factor * baseline
+ settle_s: float = 0.1, # dwell after each move
+ max_legs: int = 400, # safety cap on spiral legs
+ laser_name: str = "LED",
+ laser_intensity: float = 1023,
+ homing_procedure = False,
+ speed_um_s: float = None # movement speed in µm/s
+ ) -> tuple[float, float] | None:
+ """
+ Spiral search around current position. Moves in a square-spiral:
+ (+X), (+Y), (-X), (-Y), increasing leg length after every two legs.
+ Leg lengths (in 'units') follow: start_len_units, start_len_units,
+ start_len_units+increment_units, start_len_units+increment_units, ...
+ Each unit corresponds to 'unit_um' micrometers.
+
+ Stops when mean intensity rises by 'intensity_factor' over the initial baseline.
+ Returns (X, Y) in µm, or None if aborted.
+ """
+ # Set running state for threading
+ self._is_running = True
+ try:
+ return self._performCalibrationSearchLogic(
+ unit_um, increment_units, start_len_units, min_x, max_x, min_y, max_y,
+ intensity_factor, settle_s, max_legs, laser_name, laser_intensity, homing_procedure, speed_um_s
+ )
+ except Exception as e:
+ self._logger.error(f"Calibration center search failed: {e}")
+ return None
+ finally:
+ self._is_running = False
+ # Save positions to CSV for record keeping
+ self._savePositionsCsv()
- directions = [(1, 0), (0, 1), (-1, 0), (0, -1)] # E, N, W, S
- dir_idx = 0
- run_len = 1
- legs_done = 0
- off_x = off_y = 0.0
+ def _performCalibrationSearchLogic(
+ self,
+ unit_um: float,
+ increment_units: int,
+ start_len_units: int,
+ min_x: float | None,
+ max_x: float | None,
+ min_y: float | None,
+ max_y: float | None,
+ intensity_factor: float,
+ settle_s: float,
+ max_legs: int,
+ laser_name: str,
+ laser_intensity: float,
+ homing_procedure: bool,
+ speed_um_s: float
+ ) -> tuple[float, float] | None:
+ """
+ Core logic for calibration search.
+ """
+ stage = self.getStage()
+ if homing_procedure:
+ self._logger.info("Homing stage...")
+ stage.home_x(isBlocking=False)
+ stage.home_y(isBlocking=True)
+ stage.home_x(isBlocking=True)
+
+ # Home the stage in X and Y
+ stage.resetStageOffsetAxis(axis="X")
+ stage.resetStageOffsetAxis(axis="Y")
+
+ # Set position to reasonable value # TODO: The interface is a bit misleading => it still computes the difference between the two positions as the offset
+ stage.setStageOffsetAxis(knownPosition = self.HOME_POS_X, currentPosition = 0, axis="X") # in µm
+ stage.setStageOffsetAxis(knownPosition = self.HOME_POS_Y, currentPosition = 0, axis="Y") # in µm
+
+ # Move to calibration center position
+ self._logger.info("Moving to calibration center position...")
+ stage.move(axis="XY", value=(self.CALIBRATION_CENTER_X*1000,self.CALIBRATION_CENTER_Y*1000), is_absolute=True, is_blocking=True, speed=self.MAX_SPEED) # in µm
+
+ # Turn on laser if specified
+ if laser_name is not None and laser_intensity is not None:
+ try:
+ if hasattr(self._master, 'lasersManager'):
+ laser_controller = self._master.lasersManager[laser_name]
+ if laser_controller:
+ laser_controller.setValue(laser_intensity)
+ laser_controller.setEnabled(True)
+ self._logger.info(f"Laser {laser_name} activated at {laser_intensity}")
+ else:
+ self._logger.warning("Laser manager not available")
+ except Exception as e:
+ self._logger.warning(f"Could not activate laser {laser_name}: {e}")
+
+ if not self._is_running:
+ return None
- while self._is_running:
- dx, dy = directions[dir_idx]
- axis = "X" if dx else "Y"
+ # ensure camera is in livemode to grab frames continuously
+ # detector = self.getDetector()
+ self._commChannel.sigStartLiveAcquistion.emit(True)
- for _ in range(run_len):
- if not self._is_running:
- break
- off_x += dx * step_um
- off_y += dy * step_um
+ # Helpers
+ def clamp(val: float, lo: float | None, hi: float | None) -> float:
+ if lo is not None and val < lo:
+ return lo
+ if hi is not None and val > hi:
+ return hi
+ return val
- if max(abs(off_x), abs(off_y)) > max_r:
- self._logger.info("Max radius reached – stop")
- self._is_running = False
- break
+ #def move_abs(axis: str, target: float) -> None:
+ # stage.move(axis=axis, value=target, is_absolute=True, is_blocking=True, speed=speed_um_s)
+
+ # Initialize movement controller for asynchronous movement
+ movement_controller = MovementController(stage)
- target = (cx + off_x) if axis == "X" else (cy + off_y)
- ctrl = MovementController(self.getStage())
- ctrl.move_to_position(target, axis=axis, speed=speed, is_absolute=True)
-
- # ───── grab frames while travelling ─────
- while not ctrl.is_target_reached() and self._is_running:
- m = self._grabMeanFrame()
- p = self.getStage().getPosition()
- self._positions.append((p["X"], p["Y"]))
- if m is not None and m >= baseline * bf:
- self._logger.info("Brightness threshold hit – done")
- self._is_running = False
- break
- time.sleep(0.002) # mild CPU relief
- if not self._is_running:
- break
+ threshold = 20 # We expect at least 20 pixels to be saturated
- if not self._is_running:
- break
+ # Spiral state
+ dirs = [(+1, 0), (0, +1), (-1, 0), (0, -1)] # +X, +Y, -X, -Y
+ dir_idx = 0
+ len_units = start_len_units
+ legs_done = 0
+ # Start from current absolute position
+ pos = stage.getPosition()
+ x = float(pos["X"])
+ y = float(pos["Y"])
+
+ # Main loop to find the whole in a spiral motion
+ while self._is_running and legs_done < max_legs:
+ dx_units, dy_units = dirs[dir_idx]
+ leg_len_um = len_units * unit_um
+ # Use continuous movement for better frame collection during movement
+ # This addresses the TODO about background movement and continuous frame analysis
+ if dx_units != 0:
+ target_x = clamp(x + dx_units * leg_len_um, min_x, max_x)
+ if target_x != x:
+ # Start asynchronous movement using MovementController
+ movement_controller.move_to_position(target_x, "X", speed_um_s, True)
+
+ # Collect frames during movement for analysis
+ while not movement_controller.is_target_reached():
+ if not self._is_running:
+ return None
+ # Grab frames during movement for analysis
+ self._grabAndProcessFrame()
+ nSaturatedPixels = self._grabAndProcessFrame()
+ self._logger.debug(f"Saturated pixels: {nSaturatedPixels} "f"at position X={x}, Y={y}")
+ if nSaturatedPixels is not None and nSaturatedPixels >= threshold:
+ movement_controller.stop_movement()
+ self._is_running = False
+ break
+ time.sleep(0.05) # Brief pause between frame grabs
+
+ x = target_x
+ else:
+ target_y = clamp(y + dy_units * leg_len_um, min_y, max_y)
+ if target_y != y:
+ # Start asynchronous movement using MovementController
+ movement_controller.move_to_position(target_y, "Y", speed_um_s, True)
+
+ # Collect frames during movement for analysis
+ while not movement_controller.is_target_reached():
+ if not self._is_running:
+ return None
+ # Grab frames during movement for analysis
+ # Grab frames during movement for analysis
+ self._grabAndProcessFrame()
+ nSaturatedPixels = self._grabAndProcessFrame()
+ self._logger.debug(f"Saturated pixels: {nSaturatedPixels} "f"at position X={x}, Y={y}")
+ if nSaturatedPixels is not None and nSaturatedPixels >= threshold:
+ movement_controller.stop_movement()
+ self._is_running = False
+ break
+ time.sleep(0.05) # Brief pause between frame grabs
+
+ y = target_y
+
+ # Next leg
dir_idx = (dir_idx + 1) % 4
legs_done += 1
- if legs_done == 2:
- legs_done = 0
- run_len += 1 # enlarge spiral
- self._savePositionsCsv()
- self._is_running = False
+ # Increase leg length after every two legs
+ if legs_done % 2 == 0:
+ len_units += increment_units
+
+
+ # TODO: We should do another round of spiral motion with half the step size to refine the position for 3x3 arms
+
+ # Measure
+ time.sleep(settle_s)
+
+ # Optional refinement if available
+ if hasattr(self, "_findRingCenter"):
+ try:
+ center = self._findRingCenter()
+ x, y = center
+ except Exception:
+ pass
+ stage.setStageOffsetAxis(knownOffset = x, axis="X") # in µm
+ stage.setStageOffsetAxis(knownOffset = y, axis="Y") # in µm
+ return (x, y)
+
+
+ def _detectWhiteLine(self, image: np.ndarray) -> bool:
+ """
+ Detect white lines in image using Hough transform.
+ Returns True if lines are detected.
+ """
+ try:
+ # Convert to grayscale if needed
+ if len(image.shape) == 3:
+ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
+ else:
+ gray = image.astype(np.uint8)
+
+ # Apply edge detection
+ edges = cv2.Canny(gray, 50, 150, apertureSize=3)
+
+ # Apply Hough line transform
+ lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100)
+
+ return lines is not None and len(lines) > 0
+
+ except Exception as e:
+ self._logger.error(f"Line detection failed: {e}")
+ return False
+
+ def _findRingCenter(self) -> tuple:
+ """
+ Find center of rings using Hough circle transform.
+ Returns (x, y) stage coordinates or None if not found.
+ """
+ try:
+ detector = self.getDetector()
+ frame = detector.getLatestFrame()
+ if frame is None:
+ return None
+
+ # Convert to grayscale if needed
+ if len(frame.shape) == 3:
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
+ else:
+ gray = frame.astype(np.uint8)
+ import NanoImagingPack as nip
+ blurred = nip.gaussf(gray, 100)
+ maxY, maxX = np.unravel_index(np.argmax(blurred, axis=None), blurred.shape)
+ # now let's move in the opposite direction of the maximum
+ mPixelSize = detector.pixelSizeUm[0]
+ center_y, center_x = maxY*mPixelSize, maxX*mPixelSize
+ stage = self.getStage()
+
+ # Convert image coordinates to stage offset
+ # This is a simplified approach - in practice, you'd need proper calibration
+ current_pos = stage.getPosition()
+
+ self._logger.info(f"Ring detected at image coords ({center_x}, {center_y})")
+
+ # Calculate offset needed to center the ring (simplified pixel-to-micron conversion)
+ image_center_x = gray.shape[1] // 2
+ image_center_y = gray.shape[0] // 2
+
+ # Assume 1 pixel = 1 micrometer (this should be calibrated properly)
+ offset_x = (center_x - image_center_x) * 1.0 # Adjust this scaling factor
+ offset_y = (center_y - image_center_y) * 1.0
+
+ # Move stage to center the ring
+ target_x = current_pos["X"] - offset_x # Negative because stage moves opposite to image
+ target_y = current_pos["Y"] - offset_y
+
+ stage.move(axis="X", value=target_x, is_absolute=True, is_blocking=True)
+ stage.move(axis="Y", value=target_y, is_absolute=True, is_blocking=True)
+
+ final_pos = stage.getPosition()
+ return (final_pos["X"], final_pos["Y"])
+
+
+ except Exception as e:
+ self._logger.error(f"Ring detection failed: {e}")
+ return None
+
+ def _navigateMaze(self, path: list):
+ """
+ Navigate through maze path, capturing images at each position.
+ """
+ stage = self.getStage()
+ detector = self.getDetector()
+
+ images = []
+ positions = []
+
+ try:
+ for step_idx, (dx, dy) in enumerate(path):
+ if not self._maze_running:
+ break
+
+ # Calculate target position (1000µm steps)
+ start_x = self.MAZE_START_X * 1000
+ start_y = self.MAZE_START_Y * 1000
+ target_x = start_x + (dx * 1000)
+ target_y = start_y + (dy * 1000)
+
+ # Move to position
+ stage.move(axis="X", value=target_x, is_absolute=True, is_blocking=True)
+ stage.move(axis="Y", value=target_y, is_absolute=True, is_blocking=True)
+
+ # Capture image
+ time.sleep(0.1) # Allow settling
+ frame = detector.getLatestFrame()
+ if frame is not None:
+ images.append(frame)
+ positions.append((target_x, target_y))
+ self._maze_positions.append((target_x, target_y))
+ self._logger.debug(f"Maze step {step_idx + 1}/{len(path)}: moved to ({target_x}, {target_y})")
+
+ # Save TIFF stack
+ if images:
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ dir_path = os.path.join(os.path.expanduser("~"), "imswitch_calibrations", timestamp)
+ os.makedirs(dir_path, exist_ok=True)
+
+ stack_path = os.path.join(dir_path, "maze_navigation_stack.tiff")
+ tif.imwrite(stack_path, np.array(images))
+
+ positions_path = os.path.join(dir_path, "maze_positions.csv")
+ np.savetxt(positions_path, np.array(positions), delimiter=",", header="X(µm),Y(µm)")
+
+ self._logger.info(f"Maze navigation completed. {len(images)} images saved to {stack_path}")
+
+ except Exception as e:
+ self._logger.error(f"Maze navigation failed: {e}")
+ finally:
+ self._maze_running = False
- @APIExport()
- def stopCalibration(self):
- """Stops the calibration process."""
- self._is_running = False
- if self._task is not None:
- self._task.join()
- self._task = None
- self._logger.info("Calibration stopped.")
# ─────────────────────── helpers ────────────────────────────────────────
- def _grabMeanFrame(self):
+ def _grabAndProcessFrame(self, threshold=250):
+ '''returns the number of saturated pixels in the latest frame'''
frame = self.getDetector().getLatestFrame()
if frame is None or frame.size == 0:
- return None
- meanValue = np.mean(frame[::20, ::20]) # subsample for speed
- self._logger.debug(f"Mean value of frame: {meanValue}")
- return meanValue
+ return 0 #None
+ processedValue = np.sum(frame[::20, ::20]>threshold)
+ self._logger.debug(f"Processed value of frame: {processedValue}")
+ return processedValue
def _savePositionsCsv(self):
if not self._positions:
@@ -214,6 +912,11 @@ def move_to_position(self, value, axis, speed, is_absolute):
def _move(self, value, axis, speed, is_absolute):
self.stage.move(axis=axis, value=value, speed=speed, is_absolute=is_absolute, is_blocking=True)
self._done = True
+
+ def stop_movement(self):
+ # Implement stopping logic if supported by stage
+ self._done = True
+ self.stage.stopAll()
def is_target_reached(self):
return self._done
diff --git a/imswitch/imcontrol/model/managers/positioners/ESP32StageManager.py b/imswitch/imcontrol/model/managers/positioners/ESP32StageManager.py
index 9a921dd37..f0069d1a8 100644
--- a/imswitch/imcontrol/model/managers/positioners/ESP32StageManager.py
+++ b/imswitch/imcontrol/model/managers/positioners/ESP32StageManager.py
@@ -4,6 +4,8 @@
import numpy as np
import os
import json
+from typing import Dict, List, Optional, Union
+
MAX_ACCEL = 1000000
PHYS_FACTOR = 1
@@ -495,12 +497,30 @@ def moveToSampleMountingPosition(self, speed=10000, is_blocking=True):
value = (self.sampleLoadingPositions["X"], self.sampleLoadingPositions["Y"], self.sampleLoadingPositions["Z"])
self._motor.move_xyz(value, speed, is_absolute=True, is_blocking=is_blocking)
- def setStageOffsetAxis(self, knownOffset:float=None, axis="X"):
+ def setStageOffsetAxis(self, knownPosition:float=0, currentPosition:Optional[float]=None, knownOffset:Optional[float]=None, axis:str="X"):
+ # differentiate between different cases
+ if knownOffset is not None:
+ # case 0: only knownOffset is given -> use it
+ knownOffset = knownOffset
+ elif currentPosition is not None and knownPosition is not None:
+ # case 1: knownPosition and currentPosition are given -> calculate offset
+ knownOffset = currentPosition - knownPosition
+ elif currentPosition is None and knownOffset is not None:
+ # case 2: knownPosition and knownOffset are given -> calculate currentPosition
+ currentPosition = knownPosition - knownOffset
+ elif currentPosition is not None and knownOffset is not None:
+ # case 3: all three values are given -> check if they are consistent
+ if not np.isclose(knownPosition, currentPosition + knownOffset):
+ self.__logger.warning(f"Given values for knownPosition ({knownPosition}), currentPosition ({currentPosition}) and knownOffset ({knownOffset}) are not consistent!")
+ else:
+ self.__logger.error("Not enough information to set stage offset. Please provide either knownOffset or both knownPosition and currentPosition.")
+ return
try:
self.stageOffsetPositions[axis] = knownOffset
except KeyError:
self.__logger.error(f"Axis {axis} not found in stageOffsetPositions.")
- self.__logger.info(f"Set offset for {axis} axis to {knownOffset} mum.")
+ self.__logger.info(f"Set offset for {axis} axis to {knownOffset} mum, your position is now: {self.getPosition()[axis]-knownOffset} mum")
+ # set the offset on the device
self._motor.set_offset(axis=axis, offset=knownOffset)
def getStageOffsetAxis(self, axis:str="X"):
diff --git a/imswitch/imcontrol/model/managers/positioners/VirtualStageManager.py b/imswitch/imcontrol/model/managers/positioners/VirtualStageManager.py
index 9f9274712..ad0f3aa96 100644
--- a/imswitch/imcontrol/model/managers/positioners/VirtualStageManager.py
+++ b/imswitch/imcontrol/model/managers/positioners/VirtualStageManager.py
@@ -29,21 +29,19 @@ def __init__(self, positionerInfo, name, **lowLevelManagers):
def move(self, value=0, axis="X", is_absolute=False, is_blocking=True, acceleration=None, speed=None, isEnable=None, timeout=1):
if axis == "X":
self._positioner.move(x=value+self.offset_x, is_absolute=is_absolute)
- if axis == "Y":
+ elif axis == "Y":
self._positioner.move(y=value+self.offset_y, is_absolute=is_absolute)
- if axis == "Z":
+ elif axis == "Z":
self._positioner.move(z=value+self.offset_z, is_absolute=is_absolute)
- if axis == "A":
+ elif axis == "A":
self._positioner.move(a=value+self.offset_a, is_absolute=is_absolute)
- if axis == "XYZ":
+ elif axis == "XYZ":
self._positioner.move(x=value[0]+self.offset_x, y=value[1]+self.offset_y, z=value[2]+self.offset_z, is_absolute=is_absolute)
- if axis == "XY":
+ elif axis == "XY":
self._positioner.move(x=value[0]+self.offset_x, y=value[1]+self.offset_y, is_absolute=is_absolute)
for axes in ["A","X","Y","Z"]:
- self._position[axes] = self._positioner.position[axes]
-
- self.getPosition() # update position in GUI
-
+ self.setPosition(axis=axes, value=self._positioner.position[axes])
+
def setPositionOnDevice(self, axis, value):
if axis == "X":
self._positioner.move(x=value, is_absolute=True)
@@ -58,7 +56,7 @@ def setPositionOnDevice(self, axis, value):
if axis == "XY":
self._positioner.move(x=value[0], y=value[1], is_absolute=True)
for axes in ["A","X","Y","Z"]:
- self._position[axes] = self._positioner.position[axes]
+ self.setPosition(axis=axes, value=self._positioner.position[axes])
#self._commChannel.sigUpdateMotorPosition.emit()
def moveForever(self, speed=(0, 0, 0, 0), is_stop=False):
@@ -68,7 +66,9 @@ def setSpeed(self, speed, axis=None):
pass
def setPosition(self, value, axis):
- pass
+ self._position[axis] = value
+ posDict = {"ESP32Stage": {axis: value}}
+ self._commChannel.sigUpdateMotorPosition.emit(posDict)
def getPosition(self):
# load position from device
@@ -116,21 +116,22 @@ def doHome(self, axis, isBlocking=False):
if axis == "Z": self.home_z(isBlocking)
- def home_x(self, isBlocking):
+ def home_x(self, isBlocking=False):
self.move(value=0, axis="X", is_absolute=True)
self.setPosition(axis="X", value=0)
- def home_y(self,isBlocking):
+ def home_y(self, isBlocking=False):
self.move(value=0, axis="Y", is_absolute=True)
self.setPosition(axis="Y", value=0)
- def home_z(self,isBlocking):
+ def home_z(self, isBlocking=False):
self.move(value=0, axis="Z", is_absolute=True)
self.setPosition(axis="Z", value=0)
def home_xyz(self):
if self.homeXenabled and self.homeYenabled and self.homeZenabled:
[self.setPosition(axis=axis, value=0) for axis in ["X","Y","Z"]]
+
def setStageOffset(self, axis, offset):
diff --git a/imswitch/teststagecenter.py b/imswitch/teststagecenter.py
new file mode 100644
index 000000000..f7d813de3
--- /dev/null
+++ b/imswitch/teststagecenter.py
@@ -0,0 +1,11 @@
+#%%
+import numpy as numpy
+import NanoImagingPack as nip
+import tifffile as tif
+
+
+
+#read image
+image = tif.imread('circle.tif')
+
+#%%
\ No newline at end of file
diff --git a/main.py b/main.py
index 717171f7d..c86abaaee 100644
--- a/main.py
+++ b/main.py
@@ -15,6 +15,7 @@
sudo firewall-cmd --zone=public --add-port=8001/tcp; sudo firewall-cmd --zone=nm-shared --add-port=8001/tcp
sudo firewall-cmd --zone=public --add-port=8002/tcp; sudo firewall-cmd --zone=nm-shared --add-port=8002/tcp
+ sudo firewall-cmd --zone=public --add-port=8888/tcp; sudo firewall-cmd --zone=nm-shared --add-port=8888/tcp
'''
# DON'T CHANGE THIS!!!!
# This has to be maintained for DOCKER!