|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- coding: utf-8 |
| 3 | + |
| 4 | +# Copyright (C) 2019 David Miguel Susano Pinto <david.pinto@bioch.ox.ac.uk> |
| 5 | +# |
| 6 | +# Microscope is free software: you can redistribute it and/or modify |
| 7 | +# it under the terms of the GNU General Public License as published by |
| 8 | +# the Free Software Foundation, either version 3 of the License, or |
| 9 | +# (at your option) any later version. |
| 10 | +# |
| 11 | +# Microscope is distributed in the hope that it will be useful, |
| 12 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 13 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 14 | +# GNU General Public License for more details. |
| 15 | +# |
| 16 | +# You should have received a copy of the GNU General Public License |
| 17 | +# along with Microscope. If not, see <http://www.gnu.org/licenses/>. |
| 18 | + |
| 19 | +import logging |
| 20 | +import re |
| 21 | +import threading |
| 22 | +import typing |
| 23 | + |
| 24 | +import serial |
| 25 | + |
| 26 | +import microscope.devices |
| 27 | + |
| 28 | + |
| 29 | +_logger = logging.getLogger(__name__) |
| 30 | + |
| 31 | + |
| 32 | +class _SharedSerial: |
| 33 | + def __init__(self, serial: serial.Serial) -> None: |
| 34 | + self._serial = serial |
| 35 | + self._lock = threading.RLock() |
| 36 | + |
| 37 | + @property |
| 38 | + def lock(self) -> threading.RLock: |
| 39 | + return self._lock |
| 40 | + |
| 41 | + def readline(self) -> bytes: |
| 42 | + with self._lock: |
| 43 | + return self._serial.readline() |
| 44 | + |
| 45 | + def read_until(self, terminator: bytes = b'\n', |
| 46 | + size: typing.Optional[int] = None) -> bytes: |
| 47 | + with self._lock: |
| 48 | + return self._serial.read_until(terminator=terminator, size=size) |
| 49 | + |
| 50 | + def write(self, data: bytes) -> int: |
| 51 | + with self._lock: |
| 52 | + return self._serial.write(data) |
| 53 | + |
| 54 | + def readlines(self, hint: int = -1) -> bytes: |
| 55 | + with self._lock: |
| 56 | + return self._serial.readlines(hint) |
| 57 | + |
| 58 | + |
| 59 | +def _get_table_value(table: bytes, key: bytes) -> bytes: |
| 60 | + """Get the value for a key in a table/multiline output. |
| 61 | +
|
| 62 | + Some commands return something like a table of key/values. There |
| 63 | + may be even empty lines on this table. This searches for the |
| 64 | + first line with a specific key (hopefully there's only one line |
| 65 | + with such key) and returns the associated value. |
| 66 | + """ |
| 67 | + # Key might be the first line, hence '(?:^|\r\n)' |
| 68 | + match = re.search(b'(?:^|\r\n) *' + key + b': (.*)\r\n', table) |
| 69 | + if match is None: |
| 70 | + raise RuntimeError('failed to find key %s on table: %s' % (key, table)) |
| 71 | + return match.group(1) |
| 72 | + |
| 73 | + |
| 74 | +class _iBeamConnection: |
| 75 | + """Connection to a specific Toptica iBeam smart laser. |
| 76 | +
|
| 77 | + This class wraps the serial connection to the device, and provides |
| 78 | + access to some of its commands performing most of the parsing and |
| 79 | + validation. |
| 80 | +
|
| 81 | + Args: |
| 82 | + port (str): port name (Windows) or path to port (everything |
| 83 | + else) to connect to. For example, `/dev/ttyS1`, `COM1`, |
| 84 | + or `/dev/cuad1`. |
| 85 | + """ |
| 86 | + def __init__(self, port: str): |
| 87 | + # From the Toptica iBeam SMART manual: |
| 88 | + # Direct connection via COMx with 115200,8,N,1 and serial |
| 89 | + # interface handshake "none". That means that no hardware |
| 90 | + # handshake (DTR, RTS) and no software handshake (XON,XOFF) of |
| 91 | + # the underlying operating system is supported. |
| 92 | + serial_conn = serial.Serial(port=port, baudrate=115200, timeout=1.0, |
| 93 | + bytesize=serial.EIGHTBITS, |
| 94 | + stopbits=serial.STOPBITS_ONE, |
| 95 | + parity=serial.PARITY_NONE, xonxoff=False, |
| 96 | + rtscts=False, dsrdtr=False) |
| 97 | + self._serial = _SharedSerial(serial_conn) |
| 98 | + |
| 99 | + # We don't know what is the current verbosity state and so we |
| 100 | + # don't know yet what we should be reading back. So blindly |
| 101 | + # set to the level we want, flush all output, and then check |
| 102 | + # if indeed this is a Toptica iBeam device. |
| 103 | + with self._serial.lock: |
| 104 | + self._serial.write(b'echo off\r\n') |
| 105 | + self._serial.write(b'prompt off\r\n') |
| 106 | + # The talk level we want is 'usual'. In theory we should |
| 107 | + # be able to use 'quiet' which only answers queries but in |
| 108 | + # practice 'quiet' does not answer some queries like 'show |
| 109 | + # serial'. |
| 110 | + self._serial.write(b'talk usual\r\n') |
| 111 | + self._serial.readlines() # discard all pending lines |
| 112 | + |
| 113 | + # Empty command does nothing and returns nothing extra so we |
| 114 | + # use it to ensure this at least behaves like a Toptica iBeam. |
| 115 | + self.command(b'') |
| 116 | + |
| 117 | + answer = self.command(b'show serial') |
| 118 | + if not answer.startswith(b'SN: '): |
| 119 | + raise RuntimeError('Failed to parse serial from %s' % answer) |
| 120 | + _logger.info("got connection to Toptica iBeam %s", answer.decode()) |
| 121 | + |
| 122 | + def command(self, command: bytes) -> bytes: |
| 123 | + """Run command and return answer after minimal validation. |
| 124 | +
|
| 125 | + The output of a command has the format:: |
| 126 | +
|
| 127 | + \r\nANSWER[OK]\r\n |
| 128 | +
|
| 129 | + The returned bytes only include `ANSWER` without its own final |
| 130 | + `\r\n`. This means that the return value might be an empty |
| 131 | + array of bytes. |
| 132 | + """ |
| 133 | + # We expect to be on 'talk usual' mode without prompt so each |
| 134 | + # command will end with [OK] on its own line. |
| 135 | + with self._serial.lock: |
| 136 | + self._serial.write(command + b'\r\n') |
| 137 | + # An answer always starts with \r\n so there will be one |
| 138 | + # before [OK] even if this command is not a query. |
| 139 | + answer = self._serial.read_until(b'\r\n[OK]\r\n') |
| 140 | + |
| 141 | + if not answer.startswith(b'\r\n'): |
| 142 | + raise RuntimeError('answer to command %s does not start with CRLF.' |
| 143 | + ' This may be leftovers from a previous command:' |
| 144 | + ' %s' % (command, answer)) |
| 145 | + if not answer.endswith(b'\r\n[OK]\r\n'): |
| 146 | + raise RuntimeError('Command %s failed or failed to read answer: %s' |
| 147 | + % (command, answer)) |
| 148 | + |
| 149 | + # If an error occurred, the answer still ends in [OK]. We |
| 150 | + # need to check if the second line (first line is \r\n) is an |
| 151 | + # error code with the format "%SYS-L-XXX, error description" |
| 152 | + # where L is the error level (I for Information, W for |
| 153 | + # Warning, E for Error, and F for Fatal), and XXX is the error |
| 154 | + # code number. |
| 155 | + if answer[2:7] == b'%SYS-' and answer[7] != ord(b'I'): |
| 156 | + # Errors of level I (information) should not raise an |
| 157 | + # exception since they can be replies to normal commands. |
| 158 | + raise RuntimeError('Command %s failed: %s' % (command, answer)) |
| 159 | + |
| 160 | + # Exclude the first \r\n, the \r\n from a possible answer, and |
| 161 | + # the final [OK]\r\n |
| 162 | + return answer[2:-8] |
| 163 | + |
| 164 | + def laser_on(self) -> None: |
| 165 | + """Activate LD driver.""" |
| 166 | + self.command(b'laser on') |
| 167 | + |
| 168 | + def laser_off(self) -> None: |
| 169 | + """Deactivate LD driver.""" |
| 170 | + self.command(b'laser off') |
| 171 | + |
| 172 | + def set_normal_channel_power(self, power: float) -> None: |
| 173 | + """Set power in mW for channel 2 (normal operating level channel). |
| 174 | +
|
| 175 | + We don't have channel number as an argument because we only |
| 176 | + want to be setting the power via channel 2 (channel 1 is the |
| 177 | + bias and we haven't seen a laser with a channel 3 yet). |
| 178 | + """ |
| 179 | + self.command(b'channel 2 power %f' % power) |
| 180 | + |
| 181 | + def show_power_uW(self) -> float: |
| 182 | + """Returns actual laser power in µW.""" |
| 183 | + answer = self.command(b'show power') |
| 184 | + if (not answer.startswith(b'PIC = ') |
| 185 | + and not answer.endswith(b' uW ')): |
| 186 | + raise RuntimeError('failed to parse power from answer: %s' % answer) |
| 187 | + return float(answer[7:-5]) |
| 188 | + |
| 189 | + def status_laser(self) -> bytes: |
| 190 | + """Returns actual status of the LD driver (ON or OFF).""" |
| 191 | + return self.command(b'status laser') |
| 192 | + |
| 193 | + def show_max_power(self) -> float: |
| 194 | + # There should be a cleaner way to get these, right? We can |
| 195 | + # query the current limits (mA) but how do we go from there to |
| 196 | + # the power limits (mW)? |
| 197 | + table = self.command(b'show satellite') |
| 198 | + key = _get_table_value(table, b'Pmax') |
| 199 | + if not key.endswith(b' mW'): |
| 200 | + raise RuntimeError('failed to parse power from %s' % key) |
| 201 | + return float(key[:-3]) |
| 202 | + |
| 203 | + def show_bias_power(self) -> float: |
| 204 | + """Return power level for the bias (channel 1) in mW.""" |
| 205 | + # We seem to only need this command to get the bias power |
| 206 | + # level. If we ever need to use it to get the other channels, |
| 207 | + # we should be returning the list of levels. |
| 208 | + table = self.command(b'show level power') |
| 209 | + key = _get_table_value(table, b'CH1, PWR') |
| 210 | + if not key.endswith(b' mW'): |
| 211 | + raise RuntimeError('failed to parse power from %s' % key) |
| 212 | + return float(key[:-3]) |
| 213 | + |
| 214 | + |
| 215 | +class TopticaiBeam(microscope.devices.LaserDevice): |
| 216 | + """Toptica iBeam smart laser. |
| 217 | +
|
| 218 | + Control of laser power is performed by setting the power level on |
| 219 | + the normal channel (#2) only. The bias channel (#1) is left |
| 220 | + unmodified and so defines the lowest level power. |
| 221 | + """ |
| 222 | + def __init__(self, port: str, **kwargs) -> None: |
| 223 | + super().__init__(**kwargs) |
| 224 | + self._conn = _iBeamConnection(port) |
| 225 | + # The Toptica iBeam has up to five operation modes, named |
| 226 | + # "channels" on the documentation. Only the first three |
| 227 | + # channels have any sort of documentation: |
| 228 | + # |
| 229 | + # Ch 1: bias level channel |
| 230 | + # Ch 2: normal operating level channel |
| 231 | + # Ch 3: only used at high-power models |
| 232 | + # |
| 233 | + # We haven't come across a laser with a channel 3 so we are |
| 234 | + # ignoring it until then. So we just leave the bias channel |
| 235 | + # (1) alone and control power via the normal channel (2). |
| 236 | + self._bias_power = self._conn.show_bias_power() |
| 237 | + self._max_power = self._conn.show_max_power() |
| 238 | + |
| 239 | + def initialize(self) -> None: |
| 240 | + pass |
| 241 | + |
| 242 | + def _on_shutdown(self) -> None: |
| 243 | + pass |
| 244 | + |
| 245 | + def get_status(self) -> typing.List[str]: |
| 246 | + status = [] # type: typing.List[str] |
| 247 | + return status |
| 248 | + |
| 249 | + def enable(self) -> None: |
| 250 | + self._conn.laser_on() |
| 251 | + |
| 252 | + def disable(self) -> None: |
| 253 | + self._conn.laser_off() |
| 254 | + |
| 255 | + def get_is_on(self) -> bool: |
| 256 | + state = self._conn.status_laser() |
| 257 | + if state == b'ON': |
| 258 | + return True |
| 259 | + elif state == b'OFF': |
| 260 | + return False |
| 261 | + else: |
| 262 | + raise RuntimeError('Unexpected laser status: %s' % state.decode()) |
| 263 | + |
| 264 | + def get_min_power_mw(self) -> float: |
| 265 | + return self._bias_power |
| 266 | + |
| 267 | + def get_max_power_mw(self) -> float: |
| 268 | + return self._max_power |
| 269 | + |
| 270 | + def get_power_mw(self) -> float: |
| 271 | + return (self._conn.show_power_uW() * (10**-3)) |
| 272 | + |
| 273 | + def _set_power_mw(self, mw: float) -> None: |
| 274 | + self._conn.set_normal_channel_power(mw) |
0 commit comments