diff --git a/docs/gui.md b/docs/gui.md index 87cba280..f8815d17 100644 --- a/docs/gui.md +++ b/docs/gui.md @@ -181,8 +181,8 @@ For optimum performance, the FastADT frame uses three separate TEM setting which **Diffraction exposure (s)** : The time taken to collect each diffraction image in seconds. In the `continuous` mode it will additionally dictate the rotation speed. -**Tracking mode** -: Dictates whether `none` or `manual` tracking is to be performed at the start of the experiment. +**Tracking algorithm** +: Dictates whether `none` or `manual` tracking algorithm is to be performed at the start of the experiment to determine pathing. **Tracking step (deg)** : The target spacing between angles at which subsequent tracking images are collected within the tracking series in degrees. diff --git a/src/instamatic/calibrate/calibrate_beamshift.py b/src/instamatic/calibrate/calibrate_beamshift.py index 06bcdf11..de2f9066 100644 --- a/src/instamatic/calibrate/calibrate_beamshift.py +++ b/src/instamatic/calibrate/calibrate_beamshift.py @@ -57,12 +57,12 @@ class CalibBeamShift: def __repr__(self): return f'CalibBeamShift(transform=\n{self.transform},\n reference_shift=\n{self.reference_shift},\n reference_pixel=\n{self.reference_pixel})' - def beamshift_to_pixelcoord(self, beamshift: Sequence[float, float]) -> Vector2: + def beamshift_to_pixelcoord(self, beamshift: Sequence[float]) -> Vector2: """Converts from beamshift x,y to pixel coordinates.""" r_i = np.linalg.inv(self.transform) return np.dot(self.reference_shift - np.array(beamshift), r_i) + self.reference_pixel - def pixelcoord_to_beamshift(self, pixelcoord: Sequence[float, float]) -> Vector2: + def pixelcoord_to_beamshift(self, pixelcoord: Sequence[float]) -> Vector2: """Converts from pixel coordinates to beamshift x,y.""" pc = np.array(pixelcoord) return self.reference_shift - np.dot(pc - self.reference_pixel, self.transform) diff --git a/src/instamatic/calibrate/calibrate_stage_rotation.py b/src/instamatic/calibrate/calibrate_stage_rotation.py index 75e53d4f..546f3aa0 100644 --- a/src/instamatic/calibrate/calibrate_stage_rotation.py +++ b/src/instamatic/calibrate/calibrate_stage_rotation.py @@ -137,7 +137,7 @@ def speed_time_to_span(self, speed: float, time: float) -> float: def plan_rotation(self, target_pace: float) -> RotationPlan: """Given target pace in sec / deg, find nearest pace, speed, delay.""" - target_speed = self.alpha_pace / target_pace # exact speed setting needed + target_speed = abs(self.alpha_pace / target_pace) # exact speed setting needed nearest_speed = self.speed_options.nearest(target_speed) # nearest setting nearest_pace = self.alpha_pace / nearest_speed # nearest in sec/deg total_delay = self.alpha_windup / nearest_speed + self.delay diff --git a/src/instamatic/camera/videostream.py b/src/instamatic/camera/videostream.py index 13a4d7dd..858cde89 100644 --- a/src/instamatic/camera/videostream.py +++ b/src/instamatic/camera/videostream.py @@ -160,6 +160,10 @@ def unblock(self): def blocked(self): yield + @contextmanager + def unblocked(self): + yield + class LiveVideoStream(VideoStream): """Handle the continuous stream of incoming data from the ImageGrabber.""" @@ -237,6 +241,17 @@ def blocked(self): if not was_set_before: self.grabber.continuousCollectionEvent.clear() + @contextmanager + def unblocked(self): + """Clear `continuousCollectionEvent` in the statement scope only.""" + was_set_before = self.grabber.continuousCollectionEvent.is_set() + try: + self.grabber.continuousCollectionEvent.clear() + yield + finally: + if was_set_before: + self.grabber.continuousCollectionEvent.set() + def show_stream(self): from instamatic.gui import videostream_frame diff --git a/src/instamatic/experiments/fast_adt/experiment.py b/src/instamatic/experiments/fast_adt/experiment.py index 523e92ce..f3dd7794 100644 --- a/src/instamatic/experiments/fast_adt/experiment.py +++ b/src/instamatic/experiments/fast_adt/experiment.py @@ -1,11 +1,16 @@ from __future__ import annotations import logging -from dataclasses import dataclass, field +from collections import deque +from contextlib import contextmanager +from copy import deepcopy +from dataclasses import dataclass, field, replace +from math import ceil from pathlib import Path from queue import Queue from threading import Thread -from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union +from tkinter import StringVar +from typing import Any, Iterator, Optional, Sequence, Union, cast import numpy as np import pandas as pd @@ -18,12 +23,19 @@ from instamatic.calibrate import CalibBeamShift, CalibMovieDelays, CalibStageRotation from instamatic.calibrate.filenames import CALIB_BEAMSHIFT from instamatic.experiments.experiment_base import ExperimentBase +from instamatic.gui.click_dispatcher import MouseButton from instamatic.processing.ImgConversionTPX import ImgConversionTPX as ImgConversion +from instamatic.utils.iterating import sawtooth -def safe_range(*, start: float, stop: float, step: float) -> np.ndarray: +def get_color(i: int) -> tuple[int, int, int]: + """Return i-th color from matplotlib colormap tab10 as accepted by PIL.""" + return tuple([int(rgb * 255) for rgb in plt.get_cmap('tab10')(i % 10)][:3]) # type: ignore + + +def safe_range(start: float, stop: float, step: float) -> np.ndarray: """Find 2+ floats between `start` and `stop` (inclusive) ~`step` apart.""" - step_count = max(round(abs(stop - start) / step) + 1, 2) + step_count = max(ceil(abs((stop - start) / step)) + 1, 2) return np.linspace(start, stop, step_count, endpoint=True, dtype=float) @@ -41,13 +53,17 @@ class Step: Index: int alpha: float + beampixel_x: Optional[float] = None + beampixel_y: Optional[float] = None beamshift_x: Optional[float] = None beamshift_y: Optional[float] = None - delta_x: Optional[float] = None - delta_y: Optional[float] = None image: Optional[np.ndarray] = None meta: dict = field(default_factory=dict) + @property + def summary(self) -> str: + return f'Step(Index={self.Index}, alpha={self.alpha})' + class Run: """Collection of details of a generalized single FastADT run. @@ -61,8 +77,9 @@ class Run: table: pd.DataFrame Describes details of individual steps (to be) measured: - alpha - average value of the rotation axes for given frame - - delta_x - x beam shift relative from center needed to track the crystal - - delta_y - y beam shift relative from center needed to track the crystal + - beampixel_x/y - beam x/y position in pixel used for tracking + - beamshift_x/y - beam deflector x/y value used for tracking + - image, meta - tracking or diffraction image and its header data """ def __init__(self, exposure=1.0, continuous=False, **columns: Sequence) -> None: @@ -70,20 +87,21 @@ def __init__(self, exposure=1.0, continuous=False, **columns: Sequence) -> None: self.continuous: bool = continuous self.table: pd.DataFrame = pd.DataFrame.from_dict(columns) - @property - def scope(self) -> Tuple[float, float]: - """The range of alpha values scanned during the entire run.""" - a = self.table['alpha'] - if not self.continuous: - return a.iloc[0], a.iloc[-1] - return a.iloc[0] - self.osc_angle / 2, a.iloc[-1] + self.osc_angle / 2 + def __str__(self) -> str: + c = self.__class__.__name__ + a = self.table['alpha'].values + ar = f'range({a[0]:.3g}, {a[-1]:.3g}, {float(np.mean(np.diff(a))):.3g})' + return f'{c}(exposure={self.exposure:.3g}, continuous={self.continuous}, alpha={ar})' + + def __len__(self) -> int: + return len(self.table) @property def steps(self) -> Iterator[Step]: """Iterate over individual run `Step`s holding rows of `self.table`.""" return (Step(**t._asdict()) for t in self.table.itertuples()) # noqa - def interpolate(self, at: np.array, key: str) -> np.ndarray: + def interpolate(self, key: str, at: np.array) -> np.ndarray: """Interpolate values of `table[key]` at some denser grid of points.""" alpha, values = self.table['alpha'], self.table[key] if at[0] > at[-1]: # decreasing order is not handled by numpy.interp @@ -91,71 +109,70 @@ def interpolate(self, at: np.array, key: str) -> np.ndarray: return np.interp(at, alpha, values) @property - def buffer(self) -> List[Tuple[int, np.ndarray, dict]]: + def buffer(self) -> list[tuple[int, np.ndarray, dict]]: """Standardized list of (number, image, meta) used when saving.""" return [(i, s.image, s.meta) for i, s in enumerate(self.steps)] @property - def has_beam_delta_information(self) -> bool: - return {'delta_x', 'delta_y'}.issubset(self.table.columns) + def has_beamshifts(self) -> bool: + return {'beamshift_x', 'beamshift_y'}.issubset(self.table.columns) @property def osc_angle(self) -> float: """Difference of alpha angle between two consecutive frames.""" - a = list(self.table['alpha']) - return (a[-1] - a[0]) / (len(a) - 1) if len(a) > 1 else -1 - - def to_continuous(self) -> Self: - """Construct a new run from N-1 first rows for continuous method.""" - new_alphas = self.table['alpha'].rolling(2).mean().drop(0) - new_cols = self.table.iloc[:-1, :].to_dict(orient='list') - del new_cols['alpha'] - c = self.__class__ - return c(exposure=self.exposure, continuous=True, alpha=new_alphas, **new_cols) - - def calculate_beamshifts(self, ctrl, beamshift) -> None: - """Note CalibBeamShift uses swapped axes: X points down, Y right.""" - beamshift_xy = ctrl.beamshift.get() - pixelcoord_xy = beamshift.beamshift_to_pixelcoord(beamshift_xy) - delta_xys = self.table[['delta_x', 'delta_y']].to_numpy() - crystal_xys = pixelcoord_xy + delta_xys - crystal_yxs = np.fliplr(crystal_xys) - beamshifts = beamshift.pixelcoord_to_beamshift(crystal_yxs) - self.table[['beamshift_x', 'beamshift_y']] = beamshifts + a = self.table['alpha'].values + return (a[-1] - a[0]) / (len(a) - 1) if len(a) > 1 else 0 + + def collapse_to_alpha_midpoints(self) -> None: + """Set current alpha midpoints as new alpha, dropping the first row.""" + alpha_midpoints = self.table['alpha'].rolling(2).mean().drop(0) + self.table = self.table.iloc[1:] + self.table['alpha'] = alpha_midpoints + + def update_images_metas(self, steps: Queue[Union[Step, None]]) -> None: + """Consume Steps from queue until None, update self.images & .meta.""" + step_list: list[Step] = [] + while True: + step = steps.get() + if step is None: + break + step_list.append(step) + self.table['image'] = [s.image for s in step_list] + self.table['meta'] = [s.meta for s in step_list] class TrackingRun(Run): - """Designed to estimate delta_x/y a priori based on manual used input.""" + """Designed to estimate beampixel_x/y a priori based on manual input.""" @classmethod - def from_params(cls, params: Dict[str, Any]) -> Self: - alpha_range = safe_range( - start=params['diffraction_start'], - stop=params['diffraction_stop'], - step=params['tracking_step'], - ) - return cls(exposure=params['tracking_time'], alpha=alpha_range) + def from_params(cls, p: dict[str, Any]) -> Self: + a = safe_range(p['diffraction_start'], p['diffraction_stop'], p['tracking_step']) + return cls(exposure=p['tracking_time'], continuous=False, alpha=a) class DiffractionRun(Run): """The implementation for the actual diffraction experiment itself.""" @classmethod - def from_params( - cls, - params: Dict[str, Any], - tracking_run: Optional['TrackingRun'] = None, - ) -> Self: - alpha_range = safe_range( - start=params['diffraction_start'], - stop=params['diffraction_stop'], - step=params['diffraction_step'], - ) - run = cls(exposure=params['diffraction_time'], alpha=alpha_range) - if tracking_run is not None: - run.table['delta_x'] = tracking_run.interpolate(alpha_range, 'delta_x') - run.table['delta_y'] = tracking_run.interpolate(alpha_range, 'delta_y') - return run + def from_params(cls, p: dict[str, Any]) -> Self: + c = p['diffraction_mode'] == 'continuous' + a = safe_range(p['diffraction_start'], p['diffraction_stop'], p['diffraction_step']) + return cls(exposure=p['diffraction_time'], continuous=c, alpha=a) + + def add_beamshifts(self, pathing_run: TrackingRun) -> None: + """Add and interpolate delta x/y info from another run instance.""" + a = self.table['alpha'].values + self.table['beamshift_x'] = pathing_run.interpolate('beamshift_x', at=a) + self.table['beamshift_y'] = pathing_run.interpolate('beamshift_y', at=a) + + +@dataclass +class Runs: + """Collection of runs for xtal tracking, beam pathing, diff collection.""" + + tracking: Optional[Run] = None + pathing: list[TrackingRun] = field(default_factory=list) + diffraction: list[DiffractionRun] = field(default_factory=list) class Experiment(ExperimentBase): @@ -174,7 +191,7 @@ class Experiment(ExperimentBase): experiment_frame: Optional instance of `ExperimentalFastADT` used to display messages videostream_frame: - Optional instance of `VideoStreamFrame` used to display messages + Optional instance of `VideoStreamFrame` to display tracking and images """ name = 'FastADT' @@ -191,20 +208,11 @@ def __init__( super().__init__() self.ctrl = ctrl self.path = Path(path) - - self.mrc_path = self.path / 'mrc' - self.tiff_path = self.path / 'tiff' - self.tiff_image_path = self.path / 'tiff_image' - self.mrc_path.mkdir(exist_ok=True, parents=True) - self.tiff_path.mkdir(exist_ok=True, parents=True) - self.tiff_image_path.mkdir(exist_ok=True, parents=True) - self.log = log or NullLogger() self.flatfield = flatfield self.fast_adt_frame = experiment_frame self.beamshift: Optional[CalibBeamShift] = None self.camera_length: int = 0 - self.diffraction_mode: str = '' if videostream_frame is not None: d = videostream_frame.click_dispatcher @@ -215,8 +223,8 @@ def __init__( self.click_listener = None self.videostream_processor = None - self.steps_queue: Queue[Union[Step, None]] = Queue() - self.run: Optional[Run] = None + self.steps: Queue[Union[Step, None]] = Queue() + self.runs: Runs = Runs() def restore_fast_adt_diff_for_image(self): """Restore 'FastADT_diff' config with 'FastADT_track' magnification.""" @@ -233,9 +241,9 @@ def get_beamshift(self) -> CalibBeamShift: try: return CalibBeamShift.from_file(calib_dir / CALIB_BEAMSHIFT) except OSError: - return CalibBeamShift.live( - self.ctrl, outdir=calib_dir, vsp=self.videostream_processor - ) + self.msg1('Focus and center the beam, and check terminal for instructions.') + vsp = self.videostream_processor + return CalibBeamShift.live(self.ctrl, outdir=calib_dir, vsp=vsp) def get_dead_time( self, @@ -248,36 +256,54 @@ def get_dead_time( return self.ctrl.cam.dead_time except AttributeError: pass - self.msg('`cam.dead_time` not found. Looking for calibrated estimate...') + self.msg2('`cam.dead_time` not found. Looking for calibrated estimate...') try: c = CalibMovieDelays.from_file(exposure, header_keys_variable, header_keys_common) except RuntimeWarning: return 0.0 else: return c.dead_time + finally: + self.msg2('') def get_stage_rotation(self) -> CalibStageRotation: """Get rotation calibration if present; otherwise warn & terminate.""" try: return CalibStageRotation.from_file() except OSError: - msg = ( - 'Collecting cRED with this script requires calibrated stage rotation. ' - 'Please run `instamatic.calibrate_stage_rotation` first.' - ) - self.msg(msg) - raise FastADTMissingCalibError(msg) - - def msg(self, text: str) -> None: - """Display a message in log.info, consoles & FastADT frame at once.""" + self.msg1(m1 := 'This script requires stage rotation to be calibrated.') + self.msg2(m2 := 'Please run `instamatic.calibrate_stage_rotation` first.') + raise FastADTMissingCalibError(m1 + ' ' + m2) + + def determine_rotation_speed_and_exposure(self, run: Run) -> tuple[float, float]: + """Closest possible speed setting & exposure considering dead time.""" + detector_dead_time = self.get_dead_time(run.exposure) + time_for_one_frame = run.exposure + detector_dead_time + rot_calib = self.get_stage_rotation() + rot_plan = rot_calib.plan_rotation(time_for_one_frame / run.osc_angle) + exposure = abs(rot_plan.pace * run.osc_angle) - detector_dead_time + return rot_plan.speed, exposure + + def _message(self, text: str, var: Optional[StringVar]) -> None: + """Display text in log.info, consoles, FastADT frame msg area 1/2.""" try: - self.fast_adt_frame.message.set(text) + var.set(text) except AttributeError: pass - print(text) if text: + print(text) self.log.info(text) + def msg1(self, text: str) -> None: + """Display in message area 1 with persistent status & instructions.""" + var = self.fast_adt_frame.message1 if self.fast_adt_frame else None + return self._message(text, var=var) + + def msg2(self, text: str) -> None: + """Display in message area 2 with the most recent tem/cam updates.""" + var = self.fast_adt_frame.message2 if self.fast_adt_frame else None + return self._message(text, var=var) + def start_collection(self, **params) -> None: """Collect FastADT experiment according to provided **params. @@ -300,183 +326,186 @@ def start_collection(self, **params) -> None: Finally, the collected run will be logged and the stage - reset. """ - self.msg('FastADT experiment started') - with self.ctrl.beam.blanked(): - image_path = self.tiff_image_path / 'image.tiff' - if not image_path.exists(): - self.ctrl.restore('FastADT_image') - with self.ctrl.beam.unblanked(delay=0.2): - self.ctrl.get_image(params['tracking_time'], out=image_path) - - if params['tracking_mode'] == 'manual': - tracking_run = TrackingRun.from_params(params) - self.collect_manual_tracking(tracking_run) - else: - tracking_run = None + self.msg1('Collecting crystal image.') + self.msg2('') + image_path = self.path / 'image.tiff' + if not image_path.exists(): + self.ctrl.restore('FastADT_image') + with self.ctrl.beam.unblanked(delay=0.2): + self.ctrl.get_image(params['tracking_time'], out=image_path) + + with self.ctrl.beam.blanked(), self.ctrl.cam.blocked(): + if params['tracking_algo'] == 'manual': + self.runs.tracking = TrackingRun.from_params(params) + self.determine_pathing_manually() + for pathing_run in self.runs.pathing: + new_run = DiffractionRun.from_params(params) + new_run.add_beamshifts(pathing_run) + self.runs.diffraction.append(new_run) + if not self.runs.pathing: + self.runs.diffraction = [DiffractionRun.from_params(params)] - self.run = DiffractionRun.from_params(params, tracking_run) self.ctrl.restore('FastADT_diff') self.camera_length = int(self.ctrl.magnification.get()) - self.diffraction_mode = params['diffraction_mode'] - if self.diffraction_mode == 'stills': - self.collect_stills(self.run) - elif self.diffraction_mode == 'continuous': - self.collect_continuous(self.run) + n_runs = len(self.runs.diffraction) + for ir, run in enumerate(self.runs.diffraction): + suffix = f' ({ir + 1}/{n_runs})' if n_runs > 1 else '' + self.msg1(f'Collecting {run!s}.{suffix}') + self.collect_run(run) + self.msg1(f'Finalizing {run!s}.{suffix}') + run.update_images_metas(self.steps) + self.finalize(run) - print(self.run.table) self.ctrl.restore('FastADT_image') - self.log.info('Collected the following run:') - self.log.info(str(self.run)) self.ctrl.stage.a = 0.0 - def collect_manual_tracking(self, run: TrackingRun) -> None: + @contextmanager + def displayed_pathing(self, step: Step) -> None: + """Display step image with dots representing existing pathing.""" + draw = self.videostream_processor.draw + instructions: list[draw.Instruction] = [] + for run_i, p in enumerate(self.runs.pathing): + x = p.table.at[step.Index, 'beampixel_x'] + y = p.table.at[step.Index, 'beampixel_y'] + instructions.append(draw.circle((x, y), fill='white', radius=5)) + instructions.append(draw.circle((x, y), fill=get_color(run_i), radius=3)) + try: + with self.videostream_processor.temporary(frame=step.image): + yield + finally: + for instruction in instructions: + draw.instructions.remove(instruction) + + def determine_pathing_manually(self) -> None: """Determine the target beam shifts `delta_x` and `delta_y` manually, based on the beam center found life (to find clicking offset) and `TrackingRun` to be used for crystal tracking in later experiment.""" - + run: TrackingRun = cast(TrackingRun, self.runs.tracking) self.restore_fast_adt_diff_for_image() - self.beamshift = self.get_beamshift() - self.ctrl.stage.a = run.table.loc[len(run.table) // 2, 'alpha'] - with self.ctrl.beam.unblanked(): - self.msg('Collecting tracking. Click on the center of the beam.') + self.ctrl.stage.a = run.table.loc[len(run) // 2, 'alpha'] + with self.ctrl.beam.unblanked(), self.ctrl.cam.unblocked(): + self.beamshift = self.get_beamshift() + self.msg1('Locate the beam (move it if needed) and click on its center.') with self.click_listener as cl: - click = cl.get_click() - beam_center_x, beam_center_y = click.x, click.y + obs_beampixel_xy = np.array(cl.get_click().xy) + cal_beampixel_yx = self.beamshift.beamshift_to_pixelcoord(self.ctrl.beamshift.get()) self.ctrl.restore('FastADT_track') - delta_xs, delta_ys = [], [] - Thread(target=self.enqueue_still_steps, args=(run,), daemon=True).start() - while (step := self.steps_queue.get()) is not None: - with self.videostream_processor.temporary(frame=step.image): - m = f'Click on the crystal (image={step.Index}, alpha={step.alpha} deg).' - self.msg(m) - with self.click_listener as cl: - click = cl.get_click() - delta_xs.append(click.x - beam_center_x) - delta_ys.append(click.y - beam_center_y) - self.msg('') - run.table['delta_x'] = delta_xs - run.table['delta_y'] = delta_ys - self.plot_tracking(tracking_run=run) - - def collect_stills(self, run: Run) -> None: - """Collect a series of stills at angles/exposure specified in `run`""" - self.msg('Collecting stills from {} to {} degree'.format(*run.scope)) - images, metas = [], [] - if run.has_beam_delta_information: - run.calculate_beamshifts(self.ctrl, self.beamshift) - - with self.ctrl.beam.unblanked(delay=0.2), self.ctrl.cam.blocked(): - for step in run.steps: - if run.has_beam_delta_information: - self.ctrl.beamshift.set(step.beamshift_x, step.beamshift_y) - self.ctrl.stage.a = step.alpha - image, meta = self.ctrl.get_image(exposure=run.exposure) - images.append(image) - metas.append(meta) - run.table['image'] = images - run.table['meta'] = metas - self.msg('Collected stills from {} to {} degree'.format(*run.scope)) - - def enqueue_still_steps(self, run: Run) -> None: - """Get & put stills to `self.tracking_queue` to eval asynchronously.""" - with self.ctrl.beam.unblanked(delay=0.2), self.ctrl.cam.blocked(): - for step in run.steps: - self.ctrl.stage.a = step.alpha - step.image = self.ctrl.get_image(exposure=run.exposure)[0] - self.steps_queue.put(step) - self.steps_queue.put(None) - - def collect_continuous(self, run: Run) -> None: - """Collect a series of scans at angles/exposure specified in `run`""" - self.msg('Collecting scans from {} to {} degree'.format(*run.scope)) - images, metas = [], [] - if run.has_beam_delta_information: - run.calculate_beamshifts(self.ctrl, self.beamshift) - rot_speed, run.exposure = self.determine_rotation_speed_and_exposure(run) + Thread(target=self.collect_run, args=(run,), daemon=True).start() + tracking_images = deque(maxlen=len(run)) + tracking_in_progress = True + while tracking_in_progress: + while (step := self.steps.get()) is not None: + self.msg1(f'Click on tracked point: {step.summary}.') + with self.displayed_pathing(step=step), self.click_listener: + click = self.click_listener.get_click() + delta_yx = (np.array(click.xy) - obs_beampixel_xy)[::-1] + click_beampixel_yx = cast(Sequence[float], cal_beampixel_yx + delta_yx) + click_beamshift_xy = self.beamshift.pixelcoord_to_beamshift(click_beampixel_yx) + cols = ['beampixel_x', 'beampixel_y', 'beamshift_x', 'beamshift_y'] + run.table.loc[step.Index, cols] = *click.xy, *click_beamshift_xy + tracking_images.append(step.image) + if 'image' not in run.table: + run.table['image'] = tracking_images + self.runs.pathing.append(deepcopy(run)) + + self.msg1('Displaying tracking. Click LEFT mouse button to start the experiment,') + self.msg2('MIDDLE to track another point, or RIGHT to cancel the experiment.') + for step in sawtooth(self.runs.tracking.steps): + with self.displayed_pathing(step=step): + with self.click_listener: + click = self.click_listener.get_click(timeout=0.5) + if click is None: + continue + self.msg2('') + if click.button == MouseButton.RIGHT: + self.msg1(msg := 'Experiment abandoned after tracking.') + raise FastADTEarlyTermination(msg) + if click.button == MouseButton.LEFT: + tracking_in_progress = False + else: # any other mouse button was clicked + for new_step in [*self.runs.tracking.steps, None]: + self.steps.put(new_step) + break + + def collect_run(self, run: Run) -> None: + """Collect `run.steps` and place them in `self.steps` Queue.""" + with self.ctrl.beam.unblanked(delay=0.2): + if run.continuous: + self._collect_scans(run=run) + else: + self._collect_stills(run=run) - self.ctrl.stage.a = float(run.table.loc[0, 'alpha']) + def _collect_scans(self, run: Run) -> None: + """Collect `run.steps` scans and place them in `self.steps` Queue.""" + rot_speed, run.exposure = self.determine_rotation_speed_and_exposure(run) + self.ctrl.stage.a = float(run.table.at[0, 'alpha']) + movie = self.ctrl.get_movie(n_frames=len(run) - 1, exposure=run.exposure) + target_alpha = float(run.table.iloc[-1].loc['alpha']) + run.collapse_to_alpha_midpoints() with self.ctrl.stage.rotation_speed(speed=rot_speed): - with self.ctrl.beam.unblanked(delay=0.2): - movie = self.ctrl.get_movie(n_frames=len(run.table) - 1, exposure=run.exposure) - a = float(run.table.iloc[-1].loc['alpha']) - self.ctrl.stage.set_with_speed(a=a, speed=rot_speed, wait=False) - for step, (image, header) in zip(run.steps, movie): - if run.has_beam_delta_information: - self.ctrl.beamshift.set(step.beamshift_x, step.beamshift_y) - images.append(image) - metas.append(header) - self.run = run.to_continuous() - self.run.table['image'] = images - self.run.table['meta'] = metas - self.msg('Collected scans from {} to {} degree'.format(*run.scope)) - - def determine_rotation_speed_and_exposure(self, run: Run) -> tuple[float, float]: - """Closest possible speed setting & exposure considering dead time.""" - detector_dead_time = self.get_dead_time(run.exposure) - time_for_one_frame = run.exposure + detector_dead_time - rot_calib = self.get_stage_rotation() - rot_plan = rot_calib.plan_rotation(time_for_one_frame / run.osc_angle) - exposure = abs(rot_plan.pace * run.osc_angle) - detector_dead_time - return rot_plan.speed, exposure - - def finalize(self) -> None: - self.msg(f'Saving experiment in: {self.path}') + self.ctrl.stage.set(a=target_alpha, wait=False) + for step, (image, meta) in zip(run.steps, movie): + self.msg2(f'Collecting {step.summary}.') + if run.has_beamshifts: + self.ctrl.beamshift.set(step.beamshift_x, step.beamshift_y) + self.steps.put(replace(step, image=image, meta=meta)) + self.steps.put(None) + self.msg2('') + + def _collect_stills(self, run: Run) -> None: + """Collect `run.steps` stills and place them in `self.steps` Queue.""" + for step in run.steps: + self.msg2(f'Collecting {step.summary}.') + if run.has_beamshifts: + self.ctrl.beamshift.set(step.beamshift_x, step.beamshift_y) + self.ctrl.stage.a = step.alpha + image, meta = self.ctrl.get_image(exposure=run.exposure) + self.steps.put(replace(step, image=image, meta=meta)) + self.steps.put(None) + self.msg2('') + + def get_run_output_path(self, run: DiffractionRun) -> Path: + """Return self.path if only 1 run done, self.path/sub## if multiple.""" + if len(self.runs.pathing) <= 1: + return self.path + return self.path / f'sub{self.runs.diffraction.index(run):02d}' + + def finalize(self, run: DiffractionRun) -> None: + """Create output directories and save provided run there.""" + out_path = self.get_run_output_path(run) + mrc_path = out_path / 'mrc' + tiff_path = out_path / 'tiff' + mrc_path.mkdir(exist_ok=True, parents=True) + tiff_path.mkdir(exist_ok=True, parents=True) + + self.msg1(f'Saving experiment in "{out_path}"...') rotation_axis = config.camera.camera_rotation_vs_stage_xy pixel_size = config.calibration['diff']['pixelsize'].get(self.camera_length, -1) physical_pixel_size = config.camera.physical_pixelsize # mm wavelength = config.microscope.wavelength # angstrom stretch_azimuth = config.camera.stretch_azimuth stretch_amplitude = config.camera.stretch_amplitude - - if self.diffraction_mode == 'continuous': - method = 'Continuous-Rotation 3D ED' - else: - method = 'Rotation Electron Diffraction' + m = 'Continuous-Rotation 3D ED' if run.continuous else 'Rotation Electron Diffraction' img_conv = ImgConversion( - buffer=self.run.buffer, - osc_angle=abs(self.run.osc_angle), - start_angle=self.run.table['alpha'].iloc[0], - end_angle=self.run.table['alpha'].iloc[-1], + buffer=run.buffer, + osc_angle=abs(run.osc_angle), + start_angle=run.table['alpha'].iloc[0], + end_angle=run.table['alpha'].iloc[-1], rotation_axis=rotation_axis, - acquisition_time=self.run.exposure, + acquisition_time=run.exposure, flatfield=self.flatfield, pixelsize=pixel_size, physical_pixelsize=physical_pixel_size, wavelength=wavelength, stretch_amplitude=stretch_amplitude, stretch_azimuth=stretch_azimuth, - method=method, + method=m, ) - img_conv.threadpoolwriter(tiff_path=self.tiff_path, mrc_path=self.mrc_path, workers=8) - img_conv.write_ed3d(self.mrc_path) - img_conv.write_pets_inp(self.path) - img_conv.write_beam_centers(self.path) - self.msg('Data collection and conversion done. FastADT experiment finalized.') - - def plot_tracking(self, tracking_run: Run) -> None: - """Plot tracking results in `VideoStreamFrame` and let user reject.""" - fig, ax1 = plt.subplots() - ax2 = ax1.twinx() - ax1.set_xlabel('alpha [degrees]') - ax1.set_ylabel('ΔX [pixels]') - ax2.set_ylabel('ΔY [pixels]') - ax1.yaxis.label.set_color('red') - ax2.yaxis.label.set_color('blue') - ax2.spines['left'].set_color('red') - ax2.spines['right'].set_color('blue') - ax1.tick_params(axis='y', colors='red') - ax2.tick_params(axis='y', colors='blue') - ax1.plot('alpha', 'delta_x', data=tracking_run.table, color='red', label='X') - ax2.plot('alpha', 'delta_y', data=tracking_run.table, color='blue', label='Y') - fig.tight_layout() - self.msg('Tracking results: left-click to accept, right-click to reject.') - with self.videostream_processor.temporary(figure=fig): - with self.click_listener as cl: - if cl.get_click().button != 1: - self.msg('Experiment abandoned after tracking.') - raise FastADTEarlyTermination('Experiment abandoned after tracking.') - def teardown(self) -> None: - self.finalize() + img_conv.threadpoolwriter(tiff_path=tiff_path, mrc_path=mrc_path, workers=8) + img_conv.write_ed3d(mrc_path) + img_conv.write_pets_inp(out_path) + img_conv.write_beam_centers(out_path) + self.msg1(f'Experiment saved in "{out_path}".') diff --git a/src/instamatic/gui/click_dispatcher.py b/src/instamatic/gui/click_dispatcher.py index 65c49b23..7b430cb2 100644 --- a/src/instamatic/gui/click_dispatcher.py +++ b/src/instamatic/gui/click_dispatcher.py @@ -2,6 +2,7 @@ import enum import queue +from dataclasses import dataclass from typing import Callable, Optional, Union from typing_extensions import Self @@ -10,26 +11,26 @@ class MouseButton(enum.IntEnum): + """Mirrors tkinter.Event event values.""" + LEFT = 1 MIDDLE = 2 RIGHT = 3 + SCROLL_UP = 4 + SCROLL_DOWN = 5 +@dataclass class ClickEvent: """Individual click event expected and handled by `ClickListener`s.""" - def __init__( - self, - x: Optional[int] = None, - y: Optional[int] = None, - button: Optional[int] = None, - ) -> None: - self.x = x if x else 0 - self.y = y if y else 0 - self.button = MouseButton(button) if button else MouseButton.LEFT + x: Optional[int] = None + y: Optional[int] = None + button: MouseButton = MouseButton.LEFT - def __repr__(self) -> str: - return f'{self.__class__.__name__}(x={self.x}, y={self.y}, button={self.button})' + @property + def xy(self) -> tuple[int, int]: + return self.x, self.y class ClickListener: diff --git a/src/instamatic/gui/fast_adt_frame.py b/src/instamatic/gui/fast_adt_frame.py index 4fc2ed22..4bae2c4f 100644 --- a/src/instamatic/gui/fast_adt_frame.py +++ b/src/instamatic/gui/fast_adt_frame.py @@ -1,10 +1,11 @@ from __future__ import annotations import threading +from functools import wraps from queue import Queue from tkinter import * from tkinter.ttk import * -from typing import Any, Optional +from typing import Any, Callable, Optional from instamatic import controller from instamatic.utils.spinbox import Spinbox @@ -13,7 +14,6 @@ pad0 = {'sticky': 'EW', 'padx': 0, 'pady': 1} pad10 = {'sticky': 'EW', 'padx': 10, 'pady': 1} -width = {'width': 19} angle_lim = {'from_': -90, 'to': 90, 'increment': 1, 'width': 20} angle_delta = {'from_': 0, 'to': 180, 'increment': 0.1, 'width': 20} duration = {'from_': 0, 'to': 60, 'increment': 0.1} @@ -49,22 +49,39 @@ def restore(self) -> None: class ExperimentalFastADTVariables: """A collection of tkinter Variable instances passed to the experiment.""" - def __init__(self): + def __init__(self, on_change: Optional[Callable[[], None]] = None) -> None: self.diffraction_mode = StringVar() self.diffraction_start = DoubleVar(value=-30) self.diffraction_stop = DoubleVar(value=30) self.diffraction_step = DoubleVar(value=0.5) self.diffraction_time = DoubleVar(value=0.5) - self.tracking_mode = StringVar() + self.tracking_algo = StringVar() self.tracking_time = DoubleVar(value=0.5) self.tracking_step = DoubleVar(value=5.0) + if on_change: + self._add_callback(on_change) + + def _add_callback(self, callback: Callable[[], None]) -> None: + """Add a safe trace callback to all `Variable` instances in self.""" + + @wraps(callback) + def safe_callback(*_): + try: + callback() + except TclError as e: # Ignore invalid/incomplete GUI edits + if 'expected floating-point number' not in str(e): + raise + except AttributeError as e: # Ignore incomplete initialization + if 'object has no attribute' not in str(e): + raise + + for name, var in vars(self).items(): + if isinstance(var, Variable): + var.trace_add('write', safe_callback) + def as_dict(self): - return { - v: getattr(self, v).get() - for v in dir(self) - if isinstance(getattr(self, v), Variable) - } + return {n: v.get() for n, v in vars(self).items() if isinstance(v, Variable)} class ExperimentalFastADT(LabelFrame, HasQMixin): @@ -73,7 +90,7 @@ class ExperimentalFastADT(LabelFrame, HasQMixin): def __init__(self, parent): super().__init__(parent, text='Experiment with a priori tracking options') self.parent = parent - self.var = ExperimentalFastADTVariables() + self.var = ExperimentalFastADTVariables(on_change=self.update_widget) self.q: Optional[Queue] = None self.busy: bool = False self.ctrl = controller.get_instance() @@ -82,11 +99,9 @@ def __init__(self, parent): f = Frame(self) Label(f, text='Diffraction mode:').grid(row=3, column=0, **pad10) - self.diffraction_mode = Combobox(f, textvariable=self.var.diffraction_mode, **width) - self.diffraction_mode['values'] = ['stills', 'continuous'] - self.diffraction_mode['state'] = 'readonly' + m = ['stills', 'continuous'] + self.diffraction_mode = OptionMenu(f, self.var.diffraction_mode, m[0], *m) self.diffraction_mode.grid(row=3, column=1, **pad10) - self.diffraction_mode.current(0) Label(f, text='Diffraction start (deg):').grid(row=4, column=0, **pad10) var = self.var.diffraction_start @@ -108,14 +123,11 @@ def __init__(self, parent): self.diffraction_time = Spinbox(f, textvariable=var, **duration) self.diffraction_time.grid(row=7, column=1, **pad10) - Label(f, text='Tracking mode:').grid(row=3, column=2, **pad10) - var = self.var.tracking_mode - self.tracking_mode = Combobox(f, textvariable=var, **width) - self.tracking_mode['values'] = ['none', 'manual'] - self.tracking_mode['state'] = 'readonly' - self.tracking_mode.grid(row=3, column=3, **pad10) - self.tracking_mode.bind('<>', self.update_widget_state) - self.tracking_mode.current(0) + Label(f, text='Tracking algorithm:').grid(row=3, column=2, **pad10) + var = self.var.tracking_algo + m = ['none', 'manual'] + self.tracking_algo = OptionMenu(f, var, m[0], *m) + self.tracking_algo.grid(row=3, column=3, **pad10) Label(f, text='Tracking step (deg):').grid(row=6, column=2, **pad10) var = self.var.tracking_step @@ -176,25 +188,46 @@ def __init__(self, parent): Separator(f, orient=HORIZONTAL).grid(row=11, columnspan=4, sticky=EW, padx=10, pady=10) - # Center-aligned sticky message area and bottom start button + # Center-aligned sticky message areas 1, 2, and the bottom start button f = Frame(self) - self.message = StringVar(value='Further information will appear here.') - self.message_area = Label(f, textvariable=self.message, anchor=NW) - self.message_area.pack(fill='both', expand=True) + self.message1 = StringVar(value='Further information will appear here.') + self.message1_area = Label(f, textvariable=self.message1, anchor=NW) + self.message1_area.pack(fill='x') + + self.message2 = StringVar(value='') + self.message2_area = Label(f, textvariable=self.message2, anchor=NW) + self.message2_area.pack(fill='both', expand=True) f.pack(side='top', fill='both', expand=True, padx=10) self.start_button = Button(self, text='Start', width=1, command=self.start_collection) self.start_button.pack(side='bottom', fill='x', padx=10, pady=10) - self.update_widget_state() + self.update_widget() + + def estimate_times(self) -> tuple[float, float]: + """Estimate time needed for tracking + each diffraction in seconds.""" + a_span = abs(self.var.diffraction_start.get() - self.var.diffraction_stop.get()) + try: + track_step = self.var.tracking_step.get() + except TclError: + track_step = 0.001 + try: + diff_step = self.var.diffraction_step.get() + except TclError: + diff_step = 0.001 + track_time = 0 + if self.var.tracking_algo.get() != 'none': + track_time = self.var.tracking_time.get() * a_span / track_step + diff_time = self.var.diffraction_time.get() * a_span / diff_step + return track_time, diff_time def toggle_beam_blank(self) -> None: (self.ctrl.beam.unblank if self.ctrl.beam.is_blanked else self.ctrl.beam.blank)() - def update_widget_state(self, *_, busy: Optional[bool] = None, **__) -> None: + def update_widget(self, *_, busy: Optional[bool] = None, **__) -> None: self.busy = busy if busy is not None else self.busy - no_tracking = self.var.tracking_mode.get() == 'none' + no_tracking = self.var.tracking_algo.get() == 'none' widget_state = 'disabled' if self.busy else 'enabled' tracking_state = 'disabled' if self.busy or no_tracking else 'enabled' @@ -204,11 +237,19 @@ def update_widget_state(self, *_, busy: Optional[bool] = None, **__) -> None: self.diffraction_stop.config(state=widget_state) self.diffraction_step.config(state=widget_state) self.diffraction_time.config(state=widget_state) - self.tracking_mode.config(state=widget_state) - + self.tracking_algo.config(state=widget_state) self.tracking_step.config(state=tracking_state) self.tracking_time.config(state=tracking_state) + tracking_time, diffraction_time = self.estimate_times() + tt = '{:.0f}:{:02.0f}'.format(*divmod(tracking_time, 60)) + dt = '{:.0f}:{:02.0f}'.format(*divmod(diffraction_time, 60)) + if tracking_time: # don't display tracking time or per-attempts if zero + msg = f'Estimated time required: {tt} + {dt} / tracking.' + else: + msg = f'Estimated time required: {dt}.' + self.message2.set(msg) + def start_collection(self) -> None: self.q.put(('fast_adt', {'frame': self, **self.var.as_dict()})) @@ -231,14 +272,13 @@ def fast_adt_interface_command(controller, **params: Any) -> None: videostream_frame=videostream_frame, ) try: - fast_adt_frame.update_widget_state(busy=True) + fast_adt_frame.update_widget(busy=True) controller.fast_adt.start_collection(**params) - controller.fast_adt.finalize() except RuntimeError: pass # RuntimeError is raised if experiment is terminated early finally: del controller.fast_adt - fast_adt_frame.update_widget_state(busy=False) + fast_adt_frame.update_widget(busy=False) module = BaseModule( diff --git a/src/instamatic/gui/videostream_processor.py b/src/instamatic/gui/videostream_processor.py index f10cb7ab..bba18118 100644 --- a/src/instamatic/gui/videostream_processor.py +++ b/src/instamatic/gui/videostream_processor.py @@ -173,7 +173,8 @@ def temporary( self.temporary_image = image elif figure is not None: self.temporary_image = self.render_figure(figure) - yield + with self.vsf.stream.blocked(): + yield finally: self.temporary_frame, self.temporary_image = pre_context_values diff --git a/src/instamatic/utils/iterating.py b/src/instamatic/utils/iterating.py new file mode 100644 index 00000000..10152f36 --- /dev/null +++ b/src/instamatic/utils/iterating.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +import itertools +from typing import Iterable, Iterator, TypeVar + +T = TypeVar('T') + + +def sawtooth(iterator: Iterable[T]) -> Iterator[T]: + """Iterate elements of input sequence back and forth, repeating edges.""" + yield from itertools.cycle((seq := list(iterator)) + list(reversed(seq))) diff --git a/tests/test_experiments.py b/tests/test_experiments.py index 521dc259..4240d125 100644 --- a/tests/test_experiments.py +++ b/tests/test_experiments.py @@ -61,7 +61,7 @@ class ExperimentTestCase(InstanceAutoTracker): fast_adt_common_collect_kwargs = { 'diffraction_step': 0.5, 'diffraction_time': 0.01, - 'tracking_mode': 'none', + 'tracking_algo': 'none', 'tracking_time': 0.01, }