Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/pyclm/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from .experiments import ExperimentSchedule
from .manager import Manager, MicroscopeOutbox, SLMBuffer
from .microscope import MicroscopeProcess
from .patterns import ROI, CameraProperties, PatternMethod, PatternProcess
from .pattern_process import PatternProcess
from .patterns import ROI, CameraProperties, PatternMethod
from .queues import AllQueues
from .segmentation import SegmentationMethod, SegmentationProcess
from .segmentation import SegmentationMethod
from .segmentation_process import SegmentationProcess
80 changes: 80 additions & 0 deletions src/pyclm/core/base_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
from collections.abc import Callable
from multiprocessing import Queue
from queue import Empty
from threading import Event
from time import sleep
from typing import Any, NamedTuple

logger = logging.getLogger(__name__)


class QueueHandler(NamedTuple):
queue: Queue
handler: Callable[[Any], None]


class BaseProcess:
"""
Base class for processes that poll queues.
Eliminates busy waiting by sleeping when all queues are empty.
"""

def __init__(self, stop_event: Event | None = None, name: str = "process"):
self.stop_event = stop_event
self.name = name
self.queues: list[QueueHandler] = []
self.sleep_interval = 0.001

def register_queue(self, queue: Queue, handler: Callable[[Any], bool | None]):
"""
Register a queue to be polled.
:param queue: The multiprocessing Queue to poll.
:param handler: A callable that takes the item from the queue.
It can optionally return True to signal the process loop to break (stop).
"""
self.queues.append(QueueHandler(queue, handler))

def process(self):
"""
Main process loop.
Polls all registered queues. SLEEPS if no work was done in a cycle.
"""
logger.info(f"Started {self.name}")

while True:
if self.stop_event and self.stop_event.is_set():
logger.info(f"Force closing {self.name}")
break

did_work = False

for q_handler in self.queues:
queue = q_handler.queue
handler = q_handler.handler

if not queue.empty():
try:
item = queue.get_nowait()

should_stop = handler(item)
if should_stop:
logger.info(
f"{self.name} received stop signal from handler"
)
return

did_work = True
except Empty:
# Race condition handling: empty() said False but get_nowait() raised Empty
pass
except Exception as e:
logger.error(
f"Error handling item in {self.name}: {e}", exc_info=True
)

# If no queues had items, sleep briefly to avoid 100% CPU
if not did_work:
sleep(self.sleep_interval)

logger.info(f"Stopped {self.name}")
119 changes: 58 additions & 61 deletions src/pyclm/core/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from cv2 import warpAffine
from h5py import File

from pyclm.core.pattern_process import RequestPattern

from .datatypes import (
AcquisitionData,
CameraPattern,
Expand All @@ -35,7 +37,6 @@
Experiment,
ExperimentSchedule,
ImagingConfig,
PositionWithAutoFocus,
TimeCourse,
)
from .messages import (
Expand All @@ -47,59 +48,54 @@
UpdateZPositionMessage,
)
from .patterns import AcquiredImageRequest
from .patterns.pattern_process import RequestPattern
from .queues import AllQueues

logger = logging.getLogger(__name__)


from threading import Event

from .base_process import BaseProcess


class DataPassingProcess(metaclass=ABCMeta):
class DataPassingProcess(BaseProcess, metaclass=ABCMeta):
def __init__(self, aq: AllQueues, stop_event: Event | None = None):
super().__init__(stop_event, name="data passing process")
self.all_queues = aq
self.stop_event = stop_event

self.message_history = []

# Subclasses should set these or register queues manually
self.from_manager = None
self.data_in = None

self.class_name = "data passing process"

def process(self):
while True:
if self.stop_event and self.stop_event.is_set():
print(f"{self.class_name} force closing")
break

if not self.from_manager.empty():
msg = self.from_manager.get()

must_break = self.handle_message(msg)

if must_break:
break
def initialize_queues(self):
# Helper to register standard queues if subclasses set attributes
if self.from_manager:
self.register_queue(self.from_manager, self.handle_message_wrapper)

for data_channel in self.data_in:
# print(self.class_name, data_channel)
if not data_channel.empty():
data = data_channel.get()
if self.data_in:
for q in self.data_in:
self.register_queue(q, self.handle_data_wrapper)

if isinstance(data, Message):
print(self.class_name, data.message)
must_break = self.handle_message(data)

if must_break:
print(f"{self.class_name} exiting from message")
return True
def handle_message_wrapper(self, msg):
"""Wrapper to handle return value logic expected by BaseProcess"""
if isinstance(msg, Message):
# BaseProcess expects True to stop
return self.handle_message(msg)
return False

assert isinstance(data, GenericData), (
f"Unexpected data type: {type(data)}, expected subtype of GenericData"
)
def handle_data_wrapper(self, data):
"""Wrapper to handle data or message in data channel"""
if isinstance(data, Message):
print(self.name, data.message)
return self.handle_message(data)

self.handle_data(data)
assert isinstance(data, GenericData), (
f"Unexpected data type: {type(data)}, expected subtype of GenericData"
)
self.handle_data(data)
return False

@abstractmethod
def handle_data(self, data):
Expand Down Expand Up @@ -127,6 +123,7 @@ def __init__(
stop_event: Event | None = None,
):
super().__init__(aq, stop_event)
self.name = "microscope outbox"

if base_path is None:
base_path = Path().cwd()
Expand All @@ -145,7 +142,7 @@ def __init__(
self.base_path = base_path
self.save_type = save_type

self.class_name = "microscope outbox"
self.initialize_queues()

def handle_data(self, data):
aq_event = data.event
Expand All @@ -155,7 +152,7 @@ def handle_data(self, data):
if isinstance(data, SegmentationData):
return

print(aq_event)
# print(aq_event)

if aq_event.segment:
self.seg_queue.put(data)
Expand Down Expand Up @@ -208,36 +205,39 @@ def write_data(self, data: AcquisitionData):
if isinstance(data, SegmentationData):
dset_name = r"seg"

if self.save_type == "tif":
fullpath = self.base_path / file_relpath / relpath
fullpath.mkdir(parents=True)
try:
if self.save_type == "tif":
fullpath = self.base_path / file_relpath / relpath
fullpath.mkdir(parents=True, exist_ok=True)

tifffile.imwrite(fullpath / "data.tif", data.data)
tifffile.imwrite(fullpath / "data.tif", data.data)

else:
filepath = self.base_path / f"{file_relpath}.hdf5"
with File(filepath, "a") as f:
if aq_event.save_output:
dset = f.create_dataset(relpath + dset_name, data=data.data)
aq_event.write_attrs(dset)

if isinstance(data, StimulationData):
if aq_event.save_stim:
dset = f.create_dataset(relpath + r"dmd", data=data.dmd_pattern)
dset.attrs["pattern_id"] = str(data.pattern_id)
else:
filepath = self.base_path / f"{file_relpath}.hdf5"
# Ensure directory exists
filepath.parent.mkdir(parents=True, exist_ok=True)

with File(filepath, "a") as f:
if aq_event.save_output:
dset = f.create_dataset(relpath + dset_name, data=data.data)
aq_event.write_attrs(dset)

# def test_write_data(self):
# aq_event = AcquisitionEvent("test", Position(1, 2, 0), scheduled_time=0, exposure_time_ms=1,
# sub_axes=[0, "test"])
# data = AcquisitionData(aq_event, np.random.rand(100, 100))
#
# self.write_data(data)
if isinstance(data, StimulationData):
if aq_event.save_stim:
dset = f.create_dataset(
relpath + r"dmd", data=data.dmd_pattern
)
dset.attrs["pattern_id"] = str(data.pattern_id)
aq_event.write_attrs(dset)

except Exception as e:
logger.error(f"Failed to write data: {e}", exc_info=True)


class SLMBuffer(DataPassingProcess):
def __init__(self, aq: AllQueues, stop_event: Event | None = None):
super().__init__(aq, stop_event)
self.name = "slm buffer"

self.from_manager = aq.manager_to_slm_buffer
self.data_in = [aq.pattern_to_slm]
Expand All @@ -251,15 +251,12 @@ def __init__(self, aq: AllQueues, stop_event: Event | None = None):
self.slm_shape = None
self.affine_transform = None

self.slm_shape = None
self.affine_transform = None

self.initialized = False

self.manager_done = False
self.pattern_done = False

self.class_name = "slm buffer"
self.initialize_queues()

def initialize(
self,
Expand Down
23 changes: 8 additions & 15 deletions src/pyclm/core/microscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
logger = logging.getLogger(__name__)


class MicroscopeProcess:
from .base_process import BaseProcess


class MicroscopeProcess(BaseProcess):
def __init__(
self, core: CMMCorePlus, aq: AllQueues, stop_event: Event | None = None
):
super().__init__(stop_event, name="microscope")
self.core = core
self.stop_event = stop_event

self.inbox = aq.manager_to_microscope # receives messages/events from manager
self.manager = aq.microscope_to_manager # send messages to manager
self.outbox = aq.acquisition_outbox # send acquisition data to outbox process
Expand Down Expand Up @@ -75,6 +79,8 @@ def process(self, event_await_s=0, slm_await_s=5):
f"No events in queue for {time() - event_await_start: .3f}s"
)

# Sleep briefly to be nice
sleep(self.sleep_interval)
continue

msg = self.inbox.get()
Expand Down Expand Up @@ -284,17 +290,13 @@ def handle_acquisition_event(self, aq_event: AcquisitionEvent):

sleep(1.0)

# print(aq_event.position.get_z(), self.core.getPosition())

logger.info(f"{self.t(): .3f}| acquiring image: {aq_event.exposure_time_ms}ms")
image = self.snap()
aq_event.completed_time = time()
logger.info(f"{self.t(): .3f}| image acquired")

aq_event.pixel_width_um = self.core.getPixelSizeUm()

# info(f"{self.t(): .3f}| unloading")

if aq_event.needs_slm:
data_out = StimulationData(
aq_event, image, self.current_pattern, self.current_pattern_id
Expand All @@ -303,7 +305,6 @@ def handle_acquisition_event(self, aq_event: AcquisitionEvent):
data_out = AcquisitionData(aq_event, image)

self.outbox.put(data_out)
# info(f"{self.t(): .3f}| unloaded")

def snap(self):
core = self.core
Expand All @@ -315,11 +316,3 @@ def snap(self):

def t(self):
return time() - self.start

# tagged_image = core.getTaggedImage()
# pixels = np.reshape(tagged_image.pix,
# newshape=[tagged_image.tags['Height'], tagged_image.tags['Width']])
#
# tags = tagged_image.tags
#
# return pixels, tags
Loading