|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | + |
| 4 | +## Copyright (C) 2019 David Miguel Susano Pinto <david.pinto@bioch.ox.ac.uk> |
| 5 | +## |
| 6 | +## Microscope is free software: you can redistribute it and/or modify |
| 7 | +## it under the terms of the GNU General Public License as published by |
| 8 | +## the Free Software Foundation, either version 3 of the License, or |
| 9 | +## (at your option) any later version. |
| 10 | +## |
| 11 | +## Microscope is distributed in the hope that it will be useful, |
| 12 | +## but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 13 | +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 14 | +## GNU General Public License for more details. |
| 15 | +## |
| 16 | +## You should have received a copy of the GNU General Public License |
| 17 | +## along with Microscope. If not, see <http://www.gnu.org/licenses/>. |
| 18 | + |
| 19 | +"""Lumencor Spectra Light Engine. |
| 20 | +
|
| 21 | +The implementation here is limited to the Lumencor Spectra III but |
| 22 | +should be trivial to make it work for other Lumencor light engines. |
| 23 | +We only need access to other such devices. |
| 24 | +
|
| 25 | +.. note:: |
| 26 | +
|
| 27 | + The engine is expected to be on the standard mode communications |
| 28 | + (not legacy). This can be changed via the device web interface. |
| 29 | +""" |
| 30 | + |
| 31 | +import threading |
| 32 | +import typing |
| 33 | + |
| 34 | +import serial |
| 35 | + |
| 36 | +import microscope.devices |
| 37 | + |
| 38 | + |
| 39 | +class _SyncSerial: |
| 40 | + """Wraps a `Serial` instance with a lock for synchronization.""" |
| 41 | + def __init__(self, serial: serial.Serial) -> None: |
| 42 | + self._serial = serial |
| 43 | + self._lock = threading.RLock() |
| 44 | + |
| 45 | + @property |
| 46 | + def lock(self) -> threading.RLock: |
| 47 | + return self._lock |
| 48 | + |
| 49 | + def readline(self) -> bytes: |
| 50 | + with self._lock: |
| 51 | + return self._serial.readline() |
| 52 | + |
| 53 | + def write(self, data: bytes) -> int: |
| 54 | + with self._lock: |
| 55 | + return self._serial.write(data) |
| 56 | + |
| 57 | + |
| 58 | +class _SpectraIIIConnection: |
| 59 | + """Connection to a Spectra III Light Engine. |
| 60 | +
|
| 61 | + This module makes checks for Spectra III light engine and it was |
| 62 | + only tested for it. But it should work with other lumencor light |
| 63 | + engines with little work though, if only we got access to them. |
| 64 | + """ |
| 65 | + def __init__(self, serial: _SyncSerial) -> None: |
| 66 | + self._serial = serial |
| 67 | + # We use command() and readline() instead of get_command() in |
| 68 | + # case this is not a Lumencor and won't even give a standard |
| 69 | + # answer and raises an exception during the answer validation. |
| 70 | + self._serial.write(b'GET MODEL\n') |
| 71 | + answer = self._serial.readline() |
| 72 | + if not answer.startswith(b'A MODEL Spectra III'): |
| 73 | + raise RuntimeError("Not a Lumencor Spectra III Light Engine") |
| 74 | + |
| 75 | + def command_and_answer(self, *TX_tokens: bytes) -> bytes: |
| 76 | + # Command contains two or more tokens. The first token for a |
| 77 | + # TX (transmitted) command string is one of the two keywords |
| 78 | + # GET, SET (to query or to set). The second token is the |
| 79 | + # command name. |
| 80 | + assert len(TX_tokens) >= 2, 'invalid command with less than two tokens' |
| 81 | + assert TX_tokens[0] in (b'GET', b'SET'), 'invalid command (not SET/GET)' |
| 82 | + |
| 83 | + TX_command = b' '.join(TX_tokens) + b'\n' |
| 84 | + with self._serial.lock: |
| 85 | + self._serial.write(TX_command) |
| 86 | + answer = self._serial.readline() |
| 87 | + RX_tokens = answer.split(maxsplit=2) |
| 88 | + # A received answer has at least two tokens. The first token |
| 89 | + # is A or E (for success or failure). The second token is the |
| 90 | + # command name (second token of the transmitted command). |
| 91 | + if (len(RX_tokens) < 2 or RX_tokens[0] != b'A' |
| 92 | + or RX_tokens[1] != TX_tokens[1]): |
| 93 | + raise RuntimeError('command %s failed: %s' % (TX_command, answer)) |
| 94 | + return answer |
| 95 | + |
| 96 | + def get_command(self, command: bytes, *args: bytes) -> bytes: |
| 97 | + answer = self.command_and_answer(b'GET', command, *args) |
| 98 | + # The three bytes we remove at the start are the 'A ' before |
| 99 | + # the command, and the space after the command. The last two |
| 100 | + # bytes are '\r\n'. |
| 101 | + return answer[3+len(command):-2] |
| 102 | + |
| 103 | + def set_command(self, command: bytes, *args: bytes) -> None: |
| 104 | + self.command_and_answer(b'SET', command, *args) |
| 105 | + |
| 106 | + def get_channel_map(self) -> typing.List[typing.Tuple[int, str]]: |
| 107 | + answer = self.get_command(b'CHMAP') |
| 108 | + return list(enumerate(answer.decode().split())) |
| 109 | + |
| 110 | + |
| 111 | +class _LightChannelConnection: |
| 112 | + """Commands for a channel in a Lumencor light engine.""" |
| 113 | + def __init__(self, connection: _SpectraIIIConnection, index: int) -> None: |
| 114 | + self._conn = connection |
| 115 | + self._index_bytes = b'%d' % index |
| 116 | + |
| 117 | + def get_light_state(self) -> bool: |
| 118 | + """On (True) or off (False) state""" |
| 119 | + # We use CHACT (actual light state) instead of CH (light |
| 120 | + # state) because CH checks both the TTL inputs and channel |
| 121 | + # state switches. |
| 122 | + state = self._conn.get_command(b'CHACT', self._index_bytes) |
| 123 | + if state == b'1': |
| 124 | + return True |
| 125 | + elif state == b'0': |
| 126 | + return False |
| 127 | + else: |
| 128 | + raise RuntimeError('unexpected answer') |
| 129 | + |
| 130 | + def set_light_state(self, state: bool) -> None: |
| 131 | + """Turn light on (True) or off (False).""" |
| 132 | + state_arg = b'1' if state else b'0' |
| 133 | + self._conn.set_command(b'CH', self._index_bytes, state_arg) |
| 134 | + |
| 135 | + def get_max_intensity(self) -> int: |
| 136 | + """Maximum valid intensity that can be applied to a light channel.""" |
| 137 | + return int(self._conn.get_command(b'MAXINT', self._index_bytes)) |
| 138 | + |
| 139 | + def get_power_output(self) -> float: |
| 140 | + """Estimated power output for a given channel (in mW). |
| 141 | +
|
| 142 | + Power estimation is based on the power output calibration |
| 143 | + factor, power sensor reading, power sensor exposure, power |
| 144 | + sensor gain and crosstalk level. Estimation model assumes |
| 145 | + linear dependency. |
| 146 | + """ |
| 147 | + return float(self._conn.get_command(b'CHPWRWATTS', self._index_bytes)) |
| 148 | + |
| 149 | + def get_power_reference(self) -> float: |
| 150 | + """Light power reference (in mW). |
| 151 | +
|
| 152 | + This gives an indication of the power output when the channel |
| 153 | + is set to its maximum intensity. However, its dependent on |
| 154 | + being manually set and kept adjusted over time. |
| 155 | +
|
| 156 | + A value of -1 is returned if the power reference hasn't been |
| 157 | + defined yet. |
| 158 | + """ |
| 159 | + return float(self._conn.get_command(b'PWRREF', self._index_bytes)) |
| 160 | + |
| 161 | + def set_intensity(self, intensity: int) -> None: |
| 162 | + """Set light intensity between 0 and maximum intensity.""" |
| 163 | + self._conn.set_command(b'CHINT', self._index_bytes, b'%d' % intensity) |
| 164 | + |
| 165 | + |
| 166 | +class SpectraIIILightEngine(microscope.devices.ControllerDevice): |
| 167 | + """Spectra III Light Engine. |
| 168 | +
|
| 169 | + Args: |
| 170 | + port (str): port name (Windows) or path to port (everything |
| 171 | + else) to connect to. For example, `/dev/ttyS1`, `COM1`, |
| 172 | + or `/dev/cuad1`. |
| 173 | +
|
| 174 | + The names used on the devices dict are the ones provided by the |
| 175 | + Spectra engine. These are the colour names in capitals such as |
| 176 | + `'BLUE'`, `'NIR'`, or `'VIOLET'`. |
| 177 | +
|
| 178 | + Not all sources may be turned on simultaneously. To prevent |
| 179 | + exceeding the capacity of the DC power supply, power consumption |
| 180 | + is tracked by the Spectra onboard computer. If a set limit is |
| 181 | + exceeded, either by increasing intensity settings for sources that |
| 182 | + are already on, or by turning on additional sources, commands will |
| 183 | + be rejected. To clear the error condition, reduce intensities of |
| 184 | + sources that are on or turn off additional sources. |
| 185 | +
|
| 186 | + .. note:: |
| 187 | +
|
| 188 | + This relies on having power reference values set for each |
| 189 | + channel. The Spectra light engines do not provide a method to |
| 190 | + obtain the maximum power output or to set the power ouput. As |
| 191 | + such, this relies on the internal power reference value. This |
| 192 | + should be set manually on the device to obtain reasonable |
| 193 | + results when setting the power output. |
| 194 | +
|
| 195 | + """ |
| 196 | + def __init__(self, port: str, **kwargs) -> None: |
| 197 | + super().__init__(**kwargs) |
| 198 | + self._lights = {} # type: typing.Mapping[str, microscope.devices.Device] |
| 199 | + |
| 200 | + # We use standard (not legacy) mode communication so 115200,8,N,1 |
| 201 | + serial_conn = serial.Serial(port=port, baudrate=115200, timeout=1, |
| 202 | + bytesize=serial.EIGHTBITS, |
| 203 | + stopbits=serial.STOPBITS_ONE, |
| 204 | + parity=serial.PARITY_NONE, xonxoff=False, |
| 205 | + rtscts=False, dsrdtr=False) |
| 206 | + connection = _SpectraIIIConnection(_SyncSerial(serial_conn)) |
| 207 | + |
| 208 | + for index, name in connection.get_channel_map(): |
| 209 | + if name in self._lights: |
| 210 | + raise RuntimeError('multiple lights with name \'%s\'' % name) |
| 211 | + self._lights[name] = _SpectraIIILightChannel(connection, index) |
| 212 | + |
| 213 | + @property |
| 214 | + def devices(self) -> typing.Mapping[str, microscope.devices.Device]: |
| 215 | + return self._lights |
| 216 | + |
| 217 | + |
| 218 | +class _SpectraIIILightChannel(microscope.devices.LaserDevice): |
| 219 | + """A single light channel from a light engine. |
| 220 | +
|
| 221 | + A channel is not necessarily a lasers although it subclasses from |
| 222 | + `LaserDevice`. Constituent light sources may include LEDs, |
| 223 | + luminescent light pipes, or lasers. |
| 224 | + """ |
| 225 | + def __init__(self, connection: _SpectraIIIConnection, index: int) -> None: |
| 226 | + super().__init__() |
| 227 | + self._conn = _LightChannelConnection(connection, index) |
| 228 | + # The lumencor only allows to set the power via intensity |
| 229 | + # levels (values between 0 and MAXINT) . There is no method |
| 230 | + # to query the maximum power output, that information is on |
| 231 | + # the device certificate of conformance and may changes over |
| 232 | + # time. |
| 233 | + # |
| 234 | + # Power Reference is close to the max possible power (mw). It |
| 235 | + # seems to do nothing other than providing a estimate of what |
| 236 | + # power will be emitted when the intensity is at its maximum. |
| 237 | + # It needs to be set manually and kept up to date. |
| 238 | + self._power_ref = self._conn.get_power_reference() # type: float |
| 239 | + if self._power_ref == -1: |
| 240 | + raise RuntimeError('Power reference value is not set') |
| 241 | + self._max_intensity = self._conn.get_max_intensity() # type: int |
| 242 | + |
| 243 | + def initialize(self) -> None: |
| 244 | + pass |
| 245 | + |
| 246 | + def _on_shutdown(self) -> None: |
| 247 | + # There is a shutdown command but this actually powers off the |
| 248 | + # device which is not what LaserDevice.shutdown() is meant to |
| 249 | + # do. So do nothing. |
| 250 | + pass |
| 251 | + |
| 252 | + def get_status(self) -> typing.List[str]: |
| 253 | + status = [] # type: typing.List[str] |
| 254 | + return status |
| 255 | + |
| 256 | + def enable(self) -> None: |
| 257 | + self._conn.set_light_state(True) |
| 258 | + |
| 259 | + def disable(self) -> None: |
| 260 | + self._conn.set_light_state(False) |
| 261 | + |
| 262 | + def get_is_on(self) -> bool: |
| 263 | + return self._conn.get_light_state() |
| 264 | + |
| 265 | + def get_min_power_mw(self) -> float: |
| 266 | + return 0.0 |
| 267 | + |
| 268 | + def get_max_power_mw(self) -> float: |
| 269 | + return self._power_ref |
| 270 | + |
| 271 | + def get_power_mw(self) -> float: |
| 272 | + return self._conn.get_power_output() |
| 273 | + |
| 274 | + def _set_power_mw(self, mw: float) -> None: |
| 275 | + # The mw argument should have already been clipped by |
| 276 | + # `LaserDevice.set_power_mw()` so no need to do so again. |
| 277 | + intensity = int((mw / self._power_ref) * self._max_intensity) |
| 278 | + self._conn.set_intensity(intensity) |
0 commit comments