diff --git a/src/instamatic/experiments/fast_adt/experiment.py b/src/instamatic/experiments/fast_adt/experiment.py index 616484aa..ea87ffb2 100644 --- a/src/instamatic/experiments/fast_adt/experiment.py +++ b/src/instamatic/experiments/fast_adt/experiment.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import re from collections import deque from contextlib import contextmanager from copy import deepcopy @@ -15,6 +16,7 @@ import numpy as np import pandas as pd from matplotlib import pyplot as plt +from pandas.errors import EmptyDataError, ParserError from PIL.Image import Image from typing_extensions import Self @@ -97,6 +99,25 @@ def __str__(self) -> str: def __len__(self) -> int: return len(self.table) + @classmethod + def from_csv(cls, path: AnyPath) -> Self: + """Read self, table from a csv and metadata from its comment header.""" + with open(path, 'r') as csv_file: + header = csv_file.readline().lstrip('# ').strip() + match = re.search(r'exposure=([^,]+),\s*continuous=(\S+)', header) + e = float(match.group(1)) + c = match.group(2) == 'True' + table = pd.read_csv(path, sep=',', comment='#') + return cls(exposure=e, continuous=c, **{k: col for (k, col) in table.items()}) + + def to_csv(self, path: Path) -> None: + """Write self.table to a csv and metadata to its comment header.""" + header = f'# exposure={self.exposure:.6g}, continuous={self.continuous}\n' + saved_cols = [c for c in self.table.columns if c != 'image'] + with open(path, 'w') as csv_file: + csv_file.write(header) + self.table.to_csv(path, mode='a', header=header, index=False, columns=saved_cols) + @property def steps(self) -> Iterator[Step]: """Iterate over individual run `Step`s holding rows of `self.table`.""" @@ -277,6 +298,12 @@ def get_stage_rotation(self) -> CalibStageRotation: self.msg2(m2 := 'Please run `instamatic.calibrate_stage_rotation` first.') raise FastADTMissingCalibError(m1 + ' ' + m2) + def get_tracking_directory(self) -> Path: + """Return a Path to tracking directory, create if it doesn't exist.""" + tracking_dir = self.path / 'tracking' + tracking_dir.mkdir(parents=True, exist_ok=True) + return tracking_dir + def determine_rotation_speed_and_exposure(self, run: Run) -> tuple[float, float]: """Closest possible speed setting & exposure considering dead time.""" detector_dead_time = self.get_dead_time(run.exposure) @@ -340,7 +367,12 @@ def start_collection(self, **params) -> None: if params['tracking_algo'] == 'manual': self.binsize = self.ctrl.cam.default_binsize self.runs.tracking = TrackingRun.from_params(params) - self.determine_pathing_manually() + self.pathing_determine_manually() + self.pathing_save_to_files() + elif params['tracking_algo'] == 'load': + pathing_paths = params['tracking_details'].split(';') + self.pathing_load_from_files(*pathing_paths) + self.pathing_save_to_files() for pathing_run in self.runs.pathing: new_run = DiffractionRun.from_params(params) new_run.add_beamshifts(pathing_run) @@ -363,7 +395,7 @@ def start_collection(self, **params) -> None: self.ctrl.stage.a = 0.0 @contextmanager - def displayed_pathing(self, step: Step) -> None: + def pathing_displayed(self, step: Step) -> None: """Display step image with dots representing existing pathing.""" draw = self.videostream_processor.draw instructions: list[draw.Instruction] = [] @@ -379,9 +411,9 @@ def displayed_pathing(self, step: Step) -> None: for instruction in instructions: draw.instructions.remove(instruction) - def determine_pathing_manually(self) -> None: - """Determine the target beam shifts `delta_x` and `delta_y` manually, - based on the beam center found life (to find clicking offset) and + def pathing_determine_manually(self) -> None: + """Determine the target `beamshift_x` and `beamshift_y` manually, based + on the beam center found life (to find clicking offset) and `TrackingRun` to be used for crystal tracking in later experiment.""" run: TrackingRun = cast(TrackingRun, self.runs.tracking) self.restore_fast_adt_diff_for_image() @@ -401,7 +433,7 @@ def determine_pathing_manually(self) -> None: while tracking_in_progress: while (step := self.steps.get()) is not None: self.msg1(f'Click on tracked point: {step.summary}.') - with self.displayed_pathing(step=step), self.click_listener: + with self.pathing_displayed(step=step), self.click_listener: click = self.click_listener.get_click() click_xy = np.array(click.xy) * self.binsize delta_yx = (click_xy - obs_beampixel_xy)[::-1] @@ -417,7 +449,7 @@ def determine_pathing_manually(self) -> None: self.msg1('Displaying tracking. Click LEFT mouse button to start the experiment,') self.msg2('MIDDLE to track another point, or RIGHT to cancel the experiment.') for step in sawtooth(self.runs.tracking.steps): - with self.displayed_pathing(step=step): + with self.pathing_displayed(step=step): image = self.videostream_processor.image image.info['_annotated_runs'] = len(self.runs.pathing) tracking_images[step.Index] = image @@ -428,6 +460,7 @@ def determine_pathing_manually(self) -> None: self.msg2('') if click.button == MouseButton.RIGHT: self.msg1(msg := 'Experiment abandoned after tracking.') + self.pathing_save_to_files() raise FastADTEarlyTermination(msg) if click.button == MouseButton.LEFT: tracking_in_progress = False @@ -436,15 +469,31 @@ def determine_pathing_manually(self) -> None: self.steps.put(new_step) break - drc = self.path / 'tracking' - drc.mkdir(parents=True, exist_ok=True) + td = self.get_tracking_directory() with self.ctrl.cam.blocked(): for step, image in zip(run.steps, tracking_images): i = f'image{step.Index:02d}_al{step.alpha:+03.0f}.png'.replace('+', '0') if image is None or image.info['_annotated_runs'] < len(self.runs.pathing): - with self.displayed_pathing(step=step): + with self.pathing_displayed(step=step): image = self.videostream_processor.image - self.videostream_processor.vsf.save_image(image=image, path=drc / i) + self.videostream_processor.vsf.save_image(image=image, path=td / i) + + def pathing_load_from_files(self, *paths: AnyPath) -> None: + """Load pathing runs with 'beamshift_x/y' from existing csv files.""" + for path in paths: + try: + tracking = TrackingRun.from_csv(path=path) + if m := {'beamshift_x', 'beamshift_y'} - set(tracking.table.columns): + raise KeyError(f'Missing columns: {", ".join(m)}') + self.runs.pathing.append(tracking) + except (EmptyDataError, KeyError, OSError, ParserError) as e: + self.msg2(f'{type(e).__name__}: {e} while loading "{path}".') + + def pathing_save_to_files(self) -> None: + """Save all pathing runs to separate tracking_# files.""" + td = self.get_tracking_directory() + for i, run in enumerate(self.runs.pathing): + run.to_csv(path=td / f'path_{i + 1}.csv') def collect_run(self, run: Run) -> None: """Collect `run.steps` and place them in `self.steps` Queue.""" diff --git a/src/instamatic/gui/fast_adt_frame.py b/src/instamatic/gui/fast_adt_frame.py index bfceb0e4..6c57b68a 100644 --- a/src/instamatic/gui/fast_adt_frame.py +++ b/src/instamatic/gui/fast_adt_frame.py @@ -2,6 +2,7 @@ from functools import wraps from tkinter import * +from tkinter.filedialog import askopenfilenames from tkinter.ttk import * from typing import Any, Callable, Optional @@ -54,6 +55,7 @@ def __init__(self, on_change: Optional[Callable[[], None]] = None) -> None: self.diffraction_step = DoubleVar(value=0.5) self.diffraction_time = DoubleVar(value=0.5) self.tracking_algo = StringVar() + self.tracking_details = StringVar(value='') self.tracking_time = DoubleVar(value=0.5) self.tracking_step = DoubleVar(value=5.0) @@ -122,7 +124,8 @@ def __init__(self, parent): Label(f, text='Tracking algorithm:').grid(row=3, column=2, **pad10) var = self.var.tracking_algo - m = ['none', 'manual'] + var.trace_add('write', self.load_path_files) + m = ['none', 'manual', 'load'] self.tracking_algo = OptionMenu(f, var, m[0], *m) self.tracking_algo.grid(row=3, column=3, **pad10) @@ -219,12 +222,28 @@ def estimate_times(self) -> tuple[float, float]: diff_time = self.var.diffraction_time.get() * a_span / diff_step return track_time, diff_time + def load_path_files(self, *_) -> None: + if self.var.tracking_algo.get() == 'load': + paths = askopenfilenames( + filetypes=[('CSV files', '*.csv'), ('All files', '*')], + initialdir=self.app.get_module('io').get_experiment_directory(), + parent=self, + title='Select tracking path files to use', + ) + if not paths: + if not self.var.tracking_details.get(): + self.var.tracking_algo.set('none') + else: + self.var.tracking_details.set(';'.join(paths)) + else: + self.var.tracking_details.set('') + def toggle_beam_blank(self) -> None: (self.ctrl.beam.unblank if self.ctrl.beam.is_blanked else self.ctrl.beam.blank)() def update_widget(self, *_, busy: Optional[bool] = None, **__) -> None: self.busy = busy if busy is not None else self.busy - no_tracking = self.var.tracking_algo.get() == 'none' + no_tracking = self.var.tracking_algo.get() in ('none', 'load') widget_state = 'disabled' if self.busy else 'enabled' tracking_state = 'disabled' if self.busy or no_tracking else 'enabled' @@ -244,10 +263,15 @@ def update_widget(self, *_, busy: Optional[bool] = None, **__) -> None: return tt = '{:.0f}:{:02.0f}'.format(*divmod(tracking_time, 60)) dt = '{:.0f}:{:02.0f}'.format(*divmod(diffraction_time, 60)) - if tracking_time: # don't display tracking time or per-attempts if zero - msg = f'Estimated time required: {tt} + {dt} / tracking.' - else: - msg = f'Estimated time required: {dt}.' + + if (ta := self.var.tracking_algo.get()) == 'none': + msg = f'Minimum time required: {dt}.' + elif ta == 'manual': + msg = f'Minimum time required: {tt} + {dt} / tracking.' + else: # ta == 'load' + track_count = 1 + self.var.tracking_details.get().count(';') + st = '{:.0f}:{:02.0f}'.format(*divmod(diffraction_time * track_count, 60)) + msg = f'Minimum time required: {dt} x {track_count} paths loaded = {st}.' self.message2.set(msg) def start_collection(self) -> None: