|
| 1 | +from typing import Sequence, Mapping, Iterable, Optional, Union, ContextManager |
| 2 | +from dataclasses import dataclass |
| 3 | + |
| 4 | +import numpy |
| 5 | +from rich.measure import Measurement |
| 6 | + |
| 7 | +from qupulse.utils.types import TimeType |
| 8 | +from qupulse.program import (ProgramBuilder, Program, HardwareVoltage, HardwareTime, |
| 9 | + MeasurementWindow, Waveform, RepetitionCount, SimpleExpression) |
| 10 | +from qupulse.parameter_scope import Scope |
| 11 | + |
| 12 | + |
| 13 | +@dataclass |
| 14 | +class MeasurementNode: |
| 15 | + windows: Sequence[MeasurementWindow] |
| 16 | + duration: HardwareTime |
| 17 | + |
| 18 | + |
| 19 | +@dataclass |
| 20 | +class MeasurementRepetition(MeasurementNode): |
| 21 | + body: MeasurementNode |
| 22 | + count: RepetitionCount |
| 23 | + |
| 24 | +@dataclass |
| 25 | +class MeasurementSequence(MeasurementNode): |
| 26 | + nodes: Sequence[tuple[HardwareTime, MeasurementNode]] |
| 27 | + |
| 28 | + |
| 29 | +@dataclass |
| 30 | +class MeasurementFrame: |
| 31 | + commands: list['Command'] |
| 32 | + has_duration: bool |
| 33 | + |
| 34 | +MeasurementID = str | int |
| 35 | + |
| 36 | + |
| 37 | +class MeasurementBuilder(ProgramBuilder): |
| 38 | + def __init__(self): |
| 39 | + super().__init__() |
| 40 | + |
| 41 | + self._frames = [MeasurementFrame([], False)] |
| 42 | + self._ranges: list[tuple[str, range]] = [] |
| 43 | + self._repetitions = [] |
| 44 | + self._measurements = [] |
| 45 | + self._label_counter = 0 |
| 46 | + |
| 47 | + def _with_new_frame(self, measurements): |
| 48 | + self._frames.append(MeasurementFrame([], False)) |
| 49 | + yield self |
| 50 | + frame = self._frames.pop() |
| 51 | + if not frame.has_duration: |
| 52 | + return |
| 53 | + parent = self._frames[-1] |
| 54 | + parent.has_duration = True |
| 55 | + if measurements: |
| 56 | + parent.commands.extend(map(Measure, measurements)) |
| 57 | + return frame.commands |
| 58 | + |
| 59 | + def inner_scope(self, scope: Scope) -> Scope: |
| 60 | + """This function is necessary to inject program builder specific parameter implementations into the build |
| 61 | + process.""" |
| 62 | + if self._ranges: |
| 63 | + name, rng = self._ranges[-1] |
| 64 | + return scope.overwrite({name: SimpleExpression(base=rng.start, offsets={name: rng.step})}) |
| 65 | + else: |
| 66 | + return scope |
| 67 | + |
| 68 | + def hold_voltage(self, duration: HardwareTime, voltages: Mapping[str, HardwareVoltage]): |
| 69 | + """Supports dynamic i.e. for loop generated offsets and duration""" |
| 70 | + self._frames[-1].commands.append(Wait(duration)) |
| 71 | + self._frames[-1].has_duration = True |
| 72 | + |
| 73 | + def play_arbitrary_waveform(self, waveform: Waveform): |
| 74 | + """""" |
| 75 | + self._frames[-1].commands.append(Wait(waveform.duration)) |
| 76 | + self._frames[-1].has_duration = True |
| 77 | + |
| 78 | + def measure(self, measurements: Optional[Sequence[MeasurementWindow]]): |
| 79 | + """Unconditionally add given measurements relative to the current position.""" |
| 80 | + if measurements: |
| 81 | + commands = self._frames[-1].commands |
| 82 | + commands.extend(Measure(*meas) for meas in measurements) |
| 83 | + self._frames[-1].has_duration = True |
| 84 | + |
| 85 | + def with_repetition(self, repetition_count: RepetitionCount, |
| 86 | + measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']: |
| 87 | + """Measurements that are added to the new builder are dropped if the builder is empty upon exit""" |
| 88 | + new_commands = yield from self._with_new_frame(measurements) |
| 89 | + if new_commands is None: |
| 90 | + return |
| 91 | + parent = self._frames[-1] |
| 92 | + |
| 93 | + self._label_counter += 1 |
| 94 | + label_idx = self._label_counter |
| 95 | + parent.commands.append(LoopLabel(idx=label_idx, runtime_name=None, count=RepetitionCount)) |
| 96 | + parent.commands.extend(new_commands) |
| 97 | + parent.commands.append(LoopJmp(idx=label_idx)) |
| 98 | + |
| 99 | + def with_sequence(self, |
| 100 | + measurements: Optional[Sequence[MeasurementWindow]] = None) -> ContextManager['ProgramBuilder']: |
| 101 | + """ |
| 102 | +
|
| 103 | + Measurements that are added in to the returned program builder are discarded if the sequence is empty on exit. |
| 104 | +
|
| 105 | + Args: |
| 106 | + measurements: Measurements to attach to the potential child. |
| 107 | + Returns: |
| 108 | + """ |
| 109 | + new_commands = yield from self._with_new_frame(measurements) |
| 110 | + if new_commands is None: |
| 111 | + return |
| 112 | + parent = self._frames[-1] |
| 113 | + parent.commands.extend(new_commands) |
| 114 | + |
| 115 | + def new_subprogram(self, global_transformation: 'Transformation' = None) -> ContextManager['ProgramBuilder']: |
| 116 | + """Create a context managed program builder whose contents are translated into a single waveform upon exit if |
| 117 | + it is not empty.""" |
| 118 | + yield self |
| 119 | + |
| 120 | + def with_iteration(self, index_name: str, rng: range, |
| 121 | + measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']: |
| 122 | + self._ranges.append((index_name, rng)) |
| 123 | + new_commands = yield from self._with_new_frame(measurements) |
| 124 | + self._ranges.pop() |
| 125 | + if new_commands is None: |
| 126 | + return |
| 127 | + parent = self._frames[-1] |
| 128 | + |
| 129 | + self._label_counter += 1 |
| 130 | + label_idx = self._label_counter |
| 131 | + parent.commands.append(LoopLabel(idx=label_idx, runtime_name=index_name, count=len(rng))) |
| 132 | + parent.commands.extend(new_commands) |
| 133 | + parent.commands.append(LoopJmp(idx=label_idx)) |
| 134 | + |
| 135 | + def time_reversed(self) -> ContextManager['ProgramBuilder']: |
| 136 | + self._frames.append(MeasurementFrame([], False)) |
| 137 | + yield self |
| 138 | + frame = self._frames.pop() |
| 139 | + if not frame.has_duration: |
| 140 | + return |
| 141 | + |
| 142 | + self._frames[-1].has_duration = True |
| 143 | + self._frames[-1].commands.extend(_reversed_commands(frame.commands)) |
| 144 | + |
| 145 | + def to_program(self) -> Optional[Program]: |
| 146 | + """Further addition of new elements might fail after finalizing the program.""" |
| 147 | + if self._frames[0].has_duration: |
| 148 | + return self._frames[0].commands |
| 149 | + |
| 150 | + |
| 151 | +@dataclass |
| 152 | +class LoopLabel: |
| 153 | + idx: int |
| 154 | + runtime_name: str | None |
| 155 | + count: RepetitionCount |
| 156 | + |
| 157 | + |
| 158 | +@dataclass |
| 159 | +class Measure: |
| 160 | + meas_id: MeasurementID |
| 161 | + delay: HardwareTime |
| 162 | + length: HardwareTime |
| 163 | + |
| 164 | + |
| 165 | +@dataclass |
| 166 | +class Wait: |
| 167 | + duration: HardwareTime |
| 168 | + |
| 169 | + |
| 170 | +@dataclass |
| 171 | +class LoopJmp: |
| 172 | + idx: int |
| 173 | + |
| 174 | + |
| 175 | +Command = Union[LoopLabel, LoopJmp, Wait, Measure] |
| 176 | + |
| 177 | + |
| 178 | +def _reversed_commands(cmds: Sequence[Command]) -> Sequence[Command]: |
| 179 | + reversed_cmds = [] |
| 180 | + jumps = {} |
| 181 | + for cmd in reversed(cmds): |
| 182 | + if isinstance(cmd, LoopJmp): |
| 183 | + jumps[cmd.idx] = len(reversed_cmds) |
| 184 | + reversed_cmds.append(cmd) |
| 185 | + elif isinstance(cmd, LoopLabel): |
| 186 | + jump_idx = jumps[cmd.idx] |
| 187 | + jump = reversed_cmds[jump_idx] |
| 188 | + reversed_cmds[jump_idx] = cmd |
| 189 | + reversed_cmds.append(jump) |
| 190 | + |
| 191 | + elif isinstance(cmd, Measure): |
| 192 | + if isinstance(cmd.delay, SimpleExpression) or isinstance(cmd.delay, SimpleExpression): |
| 193 | + raise NotImplementedError("TODO") |
| 194 | + reversed_cmds.append(Measure(meas_id=cmd.meas_id, |
| 195 | + delay=-(cmd.delay + cmd.length), |
| 196 | + length=cmd.length,)) |
| 197 | + elif isinstance(cmd, Wait): |
| 198 | + reversed_cmds.append(cmd) |
| 199 | + else: |
| 200 | + raise ValueError("Not a command", cmd) |
| 201 | + |
| 202 | + return reversed_cmds |
| 203 | + |
| 204 | + |
| 205 | +def to_table(commands: Sequence[Command]) -> dict[str, numpy.ndarray]: |
| 206 | + time = TimeType(0) |
| 207 | + |
| 208 | + memory = {} |
| 209 | + counts = [None] |
| 210 | + |
| 211 | + tables = {} |
| 212 | + |
| 213 | + def eval_hardware_time(t: HardwareTime): |
| 214 | + if isinstance(t, SimpleExpression): |
| 215 | + value = t.base |
| 216 | + for (factor_name, factor_val) in t.offsets.items(): |
| 217 | + count = counts[memory[factor_name]] |
| 218 | + value += factor_val * count |
| 219 | + return value |
| 220 | + else: |
| 221 | + return t |
| 222 | + |
| 223 | + def execute(sequence: Sequence[Command]) -> int: |
| 224 | + nonlocal time |
| 225 | + nonlocal tables |
| 226 | + nonlocal memory |
| 227 | + nonlocal counts |
| 228 | + |
| 229 | + skip = 0 |
| 230 | + for idx, cmd in enumerate(sequence): |
| 231 | + if idx < skip: |
| 232 | + continue |
| 233 | + if isinstance(cmd, LoopJmp): |
| 234 | + return idx |
| 235 | + elif isinstance(cmd, LoopLabel): |
| 236 | + if cmd.runtime_name: |
| 237 | + memory[cmd.runtime_name] = cmd.idx |
| 238 | + if cmd.idx == len(counts): |
| 239 | + counts.append(0) |
| 240 | + assert cmd.idx < len(counts) |
| 241 | + |
| 242 | + for iter_val in range(cmd.count): |
| 243 | + counts[cmd.idx] = iter_val |
| 244 | + pos = execute(sequence[idx + 1:]) |
| 245 | + skip = idx + pos + 2 |
| 246 | + elif isinstance(cmd, Measure): |
| 247 | + meas_time = float(eval_hardware_time(cmd.delay) + time) |
| 248 | + meas_len = float(eval_hardware_time(cmd.length)) |
| 249 | + tables.setdefault(cmd.meas_id, []).append((meas_time, meas_len)) |
| 250 | + elif isinstance(cmd, Wait): |
| 251 | + time += eval_hardware_time(cmd.duration) |
| 252 | + |
| 253 | + execute(commands) |
| 254 | + return { |
| 255 | + name: numpy.array(values) for name, values in tables.items() |
| 256 | + } |
0 commit comments