diff --git a/.gitignore b/.gitignore index 26860cc..b510c1d 100644 --- a/.gitignore +++ b/.gitignore @@ -172,6 +172,7 @@ matchmaker/utils/distances.c tests/dev*.py tests/tmp/ matchmaker/rl/* +matchmaker/prob/viterbi_step.c *~ tests/test_io_error_trace.txt diff --git a/matchmaker/assets/mozart_k265_var1_annotations_trim.txt b/matchmaker/assets/mozart_k265_var1_annotations_trim.txt new file mode 100644 index 0000000..0f24c93 --- /dev/null +++ b/matchmaker/assets/mozart_k265_var1_annotations_trim.txt @@ -0,0 +1,17 @@ +0.028146 0.028146 +0.483875 0.483875 +0.925021 0.925021 +1.388042 1.388042 +1.816167 1.816167 +2.251063 2.251063 +2.73075 2.73075 +3.226583 3.226583 +3.681792 3.681792 +4.142208 4.142208 +4.604188 4.604188 +5.050542 5.050542 +5.509917 5.509917 +5.997417 5.997417 +6.504187 6.504187 +7.05575 7.05575 +7.566167 7.566167 diff --git a/matchmaker/assets/mozart_k265_var1_short_trim.mp3 b/matchmaker/assets/mozart_k265_var1_short_trim.mp3 new file mode 100644 index 0000000..2825145 Binary files /dev/null and b/matchmaker/assets/mozart_k265_var1_short_trim.mp3 differ diff --git a/matchmaker/dp/__init__.py b/matchmaker/dp/__init__.py index 9a23ee7..3903e5f 100644 --- a/matchmaker/dp/__init__.py +++ b/matchmaker/dp/__init__.py @@ -5,7 +5,8 @@ """ from .oltw_arzt import OnlineTimeWarpingArzt -from .oltw_dixon import * +from .oltw_dixon import OnlineTimeWarpingDixon # Alias OLTWArzt = OnlineTimeWarpingArzt +OLTWDixon = OnlineTimeWarpingDixon diff --git a/matchmaker/dp/oltw_arzt.py b/matchmaker/dp/oltw_arzt.py index f087017..89d0d5a 100644 --- a/matchmaker/dp/oltw_arzt.py +++ b/matchmaker/dp/oltw_arzt.py @@ -13,7 +13,7 @@ from matchmaker.base import OnlineAlignment from matchmaker.dp.dtw_loop import oltw_arzt_loop -from matchmaker.features.audio import FRAME_RATE, QUEUE_TIMEOUT, WINDOW_SIZE +from matchmaker.features.audio import FRAME_RATE, QUEUE_TIMEOUT from matchmaker.utils import ( CYTHONIZED_METRICS_W_ARGUMENTS, CYTHONIZED_METRICS_WO_ARGUMENTS, @@ -28,6 +28,7 @@ ) STEP_SIZE: int = 5 +WINDOW_SIZE: int = 5 START_WINDOW_SIZE: Union[float, int] = 0.25 @@ -304,4 +305,4 @@ def step(self, input_features: NDArray[np.float32]) -> None: if __name__ == "__main__": - pass + pass # pragma: no cover diff --git a/matchmaker/dp/oltw_dixon.py b/matchmaker/dp/oltw_dixon.py index b7e2e7c..38905e8 100644 --- a/matchmaker/dp/oltw_dixon.py +++ b/matchmaker/dp/oltw_dixon.py @@ -1,12 +1,12 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ -On-line Dynamic Time Warping +On-line Dynamic Time Warping (Dixon 2005) """ import time from enum import IntEnum -from typing import Callable, Dict +from typing import Dict import numpy as np import progressbar @@ -14,7 +14,7 @@ from numpy.typing import NDArray from matchmaker.base import OnlineAlignment -from matchmaker.features.audio import FRAME_RATE, QUEUE_TIMEOUT, WINDOW_SIZE +from matchmaker.features.audio import FRAME_RATE, QUEUE_TIMEOUT from matchmaker.utils.misc import set_latency_stats @@ -23,12 +23,10 @@ class Direction(IntEnum): TARGET = 1 BOTH = 2 - def toggle(self): - return Direction(self ^ 1) if self != Direction.BOTH else Direction.TARGET - -MAX_RUN_COUNT: int = 30 +MAX_RUN_COUNT: int = 3 FRAME_PER_SEG = 1 +WINDOW_SIZE = 10 # seconds class OnlineTimeWarpingDixon(OnlineAlignment): @@ -48,7 +46,8 @@ class OnlineTimeWarpingDixon(OnlineAlignment): Size of the window for searching the optimal path in the cumulative cost matrix. max_run_count : int - Maximum number of times the class can run in the same direction. + Maximum consecutive advances in one direction before the other + direction is forced. Constrains the slope to [1/max_run_count, max_run_count]. frame_per_seg : int Number of frames per segment (audio chunk). @@ -59,13 +58,12 @@ class OnlineTimeWarpingDixon(OnlineAlignment): Attributes ---------- warping_path : np.ndarray [shape=(2, T)] - Warping path with pairs of indices of the reference and target features. - where warping_path[0] is the index of the reference feature and warping_path[1] is the index of the target(input) feature. + Warping path. ``warping_path[0]`` = reference index, + ``warping_path[1]`` = input index. """ DEFAULT_DISTANCE_FUNC: str = "euclidean" distance_func: str - vdist: Callable[[NDArray[np.float32]], NDArray[np.float32]] def __init__( self, @@ -79,7 +77,6 @@ def __init__( **kwargs, ): super().__init__(reference_features=reference_features) - self.input_features = np.empty((0, self.reference_features.shape[1])) self.queue = queue self.N_ref = self.reference_features.shape[0] self.frame_rate = frame_rate @@ -87,12 +84,21 @@ def __init__( self.distance_func = distance_func.lower() self.max_run_count = max_run_count self.frame_per_seg = frame_per_seg - self.current_position = 0 + self.reset() + + def reset(self): + """Reset mutable state for a new alignment run.""" + self.input_features = np.empty((0, self.reference_features.shape[1])) + self.acc_dist_matrix = np.full((self.w, self.w), np.inf, dtype=np.float32) self.wp = np.array([[0, 0]]).T # [shape=(2, T)] self.ref_pointer = 0 self.input_pointer = 0 - self.input_index: int = 0 - self.previous_direction = None + self.run_count_ref = 0 + self.run_count_input = 0 + self.best_ref = 0 + self.best_input = 0 + self.current_position = 0 + self.input_index = 0 self.last_queue_update = time.time() self.latency_stats: Dict[str, float] = { "total_latency": 0, @@ -100,257 +106,231 @@ def __init__( "max_latency": 0, "min_latency": float("inf"), } + self._initialized = False @property def warping_path(self) -> NDArray[np.float32]: # [shape=(2, T)] return self.wp + def _ref_offset(self): + return max(0, self.ref_pointer - self.w) + + def _input_offset(self): + return max(0, self.input_pointer - self.w) + + def _ref_win_size(self): + return min(self.w, self.ref_pointer) + + def _input_win_size(self): + return min(self.w, self.input_pointer) + def offset(self): - offset_x = max(self.ref_pointer - self.w, 0) - offset_y = max(self.input_pointer - self.w, 0) - return np.array([offset_x, offset_y]) - - def init_matrix(self): - x = self.ref_pointer - y = self.input_pointer - d = self.frame_per_seg - wx = min(self.w, x) - wy = min(self.w, y) - new_acc = np.zeros((wx, wy)) - new_len_acc = np.zeros((wx, wy)) - x_seg = self.reference_features[x - wx : x] # [wx, 12] - y_seg = self.input_features[min(y - d, 0) : y] # [d, 12] - dist = scipy.spatial.distance.cdist( - x_seg, y_seg, metric=self.distance_func - ) # [wx, d] - - for i in range(wx): - for j in range(d): - local_dist = dist[i, j] - update_x0 = 0 - update_y0 = wy - d - if i == 0 and j == 0: - new_acc[i, j] = local_dist - elif i == 0: - new_acc[i, update_y0 + j] = local_dist + new_acc[i, update_y0 - 1] - new_len_acc[i, update_y0 + j] = 1 + new_len_acc[i, update_y0 - 1] - elif j == 0: - new_acc[i, update_y0 + j] = local_dist + new_acc[i - 1, update_y0] - new_len_acc[i, update_y0 + j] = ( - local_dist + new_len_acc[i - 1, update_y0] - ) - self.acc_dist_matrix = new_acc - self.acc_len_matrix = new_len_acc - self.select_candidate() - - def update_ref_direction(self, dist, new_acc, new_len_acc, wx, wy, d): - for i in range(d): - for j in range(wy): - local_dist = dist[i, j] - update_x0 = wx - d - update_y0 = 0 - if j == 0: - new_acc[update_x0 + i, j] = ( - local_dist + new_acc[update_x0 + i - 1, j] - ) - new_len_acc[update_x0 + i, j] = ( - new_len_acc[update_x0 + i - 1, j] + 1 - ) - else: - compares = [ - new_acc[update_x0 + i - 1, j], - new_acc[update_x0 + i, j - 1], - new_acc[update_x0 + i - 1, j - 1] * 0.98, - ] - len_compares = [ - new_len_acc[update_x0 + i - 1, j], - new_len_acc[update_x0 + i, j - 1], - new_len_acc[update_x0 + i - 1, j - 1], - ] - local_direction = np.argmin(compares) - new_acc[update_x0 + i, j] = local_dist + compares[local_direction] - new_len_acc[update_x0 + i, j] = 1 + len_compares[local_direction] - return new_acc, new_len_acc - - def update_target_direction(self, dist, new_acc, new_len_acc, wx, wy, d): - for i in range(wx): - for j in range(d): - local_dist = dist[i, j] - update_x0 = 0 - update_y0 = wy - d - if i == 0: - new_acc[i, update_y0 + j] = local_dist + new_acc[i, update_y0 - 1] - new_len_acc[i, update_y0 + j] = 1 + new_len_acc[i, update_y0 - 1] - else: - compares = [ - new_acc[i - 1, update_y0 + j], - new_acc[i, update_y0 + j - 1], - new_acc[i - 1, update_y0 + j - 1] * 0.98, - ] - len_compares = [ - new_len_acc[i - 1, update_y0 + j], - new_len_acc[i, update_y0 + j - 1], - new_len_acc[i - 1, update_y0 + j - 1], - ] - local_direction = np.argmin(compares) - new_acc[i, update_y0 + j] = local_dist + compares[local_direction] - new_len_acc[i, update_y0 + j] = 1 + len_compares[local_direction] - return new_acc, new_len_acc - - def update_accumulate_matrix(self, direction): - # local cost matrix - x, y = self.ref_pointer, self.input_pointer - wx, wy = min(self.w, x), min(self.w, y) - d = self.frame_per_seg - new_acc = np.full((wx, wy), np.inf, dtype=np.float32) - new_len_acc = np.zeros((wx, wy)) - - if direction is not Direction.TARGET: # REF, BOTH - new_acc[:-d, :] = self.acc_dist_matrix[d:] - new_len_acc[:-d, :] = self.acc_len_matrix[d:] - x_seg = self.reference_features[x - d : x] # [d, 12] - y_seg = self.input_features[y - wy : y] # [wy, 12] - dist = scipy.spatial.distance.cdist( - x_seg, y_seg, metric=self.distance_func - ) # [d, wy] - new_acc, new_len_acc = self.update_ref_direction( - dist, new_acc, new_len_acc, wx, wy, d - ) + """Calculate the offset for the current window position.""" + return np.array([self._ref_offset(), self._input_offset()]) - elif direction is not Direction.REF: # TARGET, BOTH - overlap_y = wy - d - new_acc[:, :-d] = self.acc_dist_matrix[:, -overlap_y:] - new_len_acc[:, :-d] = self.acc_len_matrix[:, -overlap_y:] - x_seg = self.reference_features[x - wx : x] # [wx, 12] - y_seg = self.input_features[y - d : y] # [d, 12] - dist = scipy.spatial.distance.cdist( - x_seg, y_seg, metric=self.distance_func - ) # [wx, d] - new_acc, new_len_acc = self.update_target_direction( - dist, new_acc, new_len_acc, wx, wy, d - ) + def _compute_cell(self, r_win, i_win): + """ + Compute DTW accumulated cost for cell (r_win, i_win) in window coordinates. - self.acc_dist_matrix = new_acc - self.acc_len_matrix = new_len_acc - - def update_acc_dist(self, local_dist, n, m): - # Initialize the first row and column - if n == 0 and m == 0: - self.acc_dist_matrix[n, m] = local_dist - self.acc_len_matrix[n, m] = 1 - elif n == 0: - self.acc_dist_matrix[n, m] = self.acc_dist_matrix[n, m - 1] + local_dist - self.acc_len_matrix[n, m] = self.acc_len_matrix[n, m - 1] + 1 - elif m == 0: - self.acc_dist_matrix[n, m] = self.acc_dist_matrix[n - 1, m] + local_dist - self.acc_len_matrix[n, m] = self.acc_len_matrix[n - 1, m] + 1 - else: - # Find the minimum accumulative distance and length from the three possible previous positions - min_dist = min( - self.acc_dist_matrix[n - 1, m], # insertion - self.acc_dist_matrix[n, m - 1], # deletion - self.acc_dist_matrix[n - 1, m - 1], # match + Uses Sakoe-Chiba symmetric weighting (DAFx 2005, Eq. 3): + D[i,j] = min(D[i-1,j] + d, D[i,j-1] + d, D[i-1,j-1] + 2d) + + Matrix convention: acc_dist_matrix[ref_win, input_win] + """ + abs_ref = self._ref_offset() + r_win + abs_input = self._input_offset() + i_win + + ref_feature = self.reference_features[abs_ref] + input_feature = self.input_features[abs_input] + + local_dist = scipy.spatial.distance.cdist( + ref_feature.reshape(1, -1), + input_feature.reshape(1, -1), + metric=self.distance_func, + )[0, 0] + + if r_win == 0 and i_win == 0: + self.acc_dist_matrix[r_win, i_win] = local_dist + elif r_win == 0: + self.acc_dist_matrix[r_win, i_win] = ( + self.acc_dist_matrix[r_win, i_win - 1] + local_dist ) - min_len = min( - self.acc_len_matrix[n - 1, m], # insertion - self.acc_len_matrix[n, m - 1], # deletion - self.acc_len_matrix[n - 1, m - 1], # match + elif i_win == 0: + self.acc_dist_matrix[r_win, i_win] = ( + self.acc_dist_matrix[r_win - 1, i_win] + local_dist ) + else: + candidates = [ + self.acc_dist_matrix[r_win - 1, i_win] + local_dist, + self.acc_dist_matrix[r_win, i_win - 1] + local_dist, + self.acc_dist_matrix[r_win - 1, i_win - 1] + 2 * local_dist, + ] + self.acc_dist_matrix[r_win, i_win] = min(candidates) + + def _accept_input(self, input_features): + """Store input features and advance input pointer.""" + self.input_features = np.vstack([self.input_features, input_features]) + self.input_pointer += self.frame_per_seg + + def _compute_input_column(self): + """Shift the window left if full, then fill the new rightmost column.""" + if self.input_pointer > self.w: + self.acc_dist_matrix[:, :-1] = self.acc_dist_matrix[:, 1:] + self.acc_dist_matrix[:, -1] = np.inf + + input_col = self._input_win_size() - 1 + for r in range(self._ref_win_size()): + self._compute_cell(r, input_col) + + def _advance_ref(self): + """Advance ref pointer, shift the window up if full, then fill the new bottom row.""" + self.ref_pointer += self.frame_per_seg - # Update the accumulative distance and length - self.acc_dist_matrix[n, m] = min_dist + local_dist - self.acc_len_matrix[n, m] = min_len + 1 - - def init_path_cost(self): - self.acc_dist_matrix = np.full( - (self.ref_pointer, self.input_pointer), np.inf, dtype=np.float32 - ) - self.acc_len_matrix = np.zeros( - (self.ref_pointer, self.input_pointer), dtype=np.float32 - ) - self.evaluate_path_cost(direction=Direction.TARGET) - - def evaluate_path_cost(self, direction): - x = self.ref_pointer - y = self.input_pointer - d = self.frame_per_seg - wx = min(self.w, x) - wy = min(self.w, y) - - if direction is not Direction.REF: - # shift all elements to the left - np.roll(self.acc_dist_matrix, shift=-1, axis=1) - self.acc_dist_matrix[:, -1] = 0 - np.roll(self.acc_len_matrix, shift=-1, axis=1) - self.acc_len_matrix[:, -1] = 0 - - x_seg = self.reference_features[x - wx : x] # [w, 12] - y_seg = self.input_features[y - d : y] # [1, 12] - dist = scipy.spatial.distance.cdist( - x_seg, y_seg, metric=self.distance_func - ) # [w, 1] - for n in range(dist.shape[0]): - for m in range(dist.shape[1]): - self.update_acc_dist(dist[n, m], n, m) - if direction is not Direction.TARGET: - # shift all elements to the bottom - np.roll(self.acc_dist_matrix, shift=-1, axis=0) - self.acc_dist_matrix[-1, :] = 0 - np.roll(self.acc_len_matrix, shift=-1, axis=0) - self.acc_len_matrix[-1, :] = 0 - - x_seg = self.reference_features[x - d : x] # [1, 12] - y_seg = self.input_features[y - wy : y] # [w, 12] - dist = scipy.spatial.distance.cdist( - x_seg, y_seg, metric=self.distance_func - ) # [1, w] - for n in range(dist.shape[0]): - for m in range(dist.shape[1]): - self.update_acc_dist(dist[n, m], n, m) - - def update_path_cost(self, direction): - self.update_accumulate_matrix(direction) - self.select_candidate() - - def select_candidate(self): - norm_x_edge = self.acc_dist_matrix[-1, :] / self.acc_len_matrix[-1, :] - norm_y_edge = self.acc_dist_matrix[:, -1] / self.acc_len_matrix[:, -1] - cat = np.concatenate((norm_x_edge, norm_y_edge)) - min_idx = np.argmin(cat) - offset = self.offset() - if min_idx <= len(norm_x_edge): - self.candidate = np.array([self.ref_pointer - offset[0], min_idx]) + if self.ref_pointer > self.w: + self.acc_dist_matrix[:-1, :] = self.acc_dist_matrix[1:, :] + self.acc_dist_matrix[-1, :] = np.inf + + ref_row = self._ref_win_size() - 1 + for c in range(self._input_win_size()): + self._compute_cell(ref_row, c) + + def _determine_direction(self): + """Apply MaxRunCount slope constraint to the suggested direction""" + suggested_direction = self.get_expand_direction() + + if self.run_count_ref >= self.max_run_count: + return Direction.TARGET + elif self.run_count_input >= self.max_run_count: + return Direction.REF else: - self.candidate = np.array( - [min_idx - len(norm_x_edge), self.input_pointer - offset[1]] - ) + return suggested_direction - def save_history(self): - new_coordinate = np.expand_dims( - self.offset() + self.candidate, axis=1 - ) # [2, 1] - self.wp = np.concatenate((self.wp, new_coordinate), axis=1) - - def select_next_direction(self): - if self.input_pointer <= self.w: - next_direction = Direction.TARGET - elif self.run_count > self.max_run_count: - next_direction = self.previous_direction.toggle() + def get_expand_direction(self): + """ + Determine expansion direction (GetInc from Dixon 2005, IJCAI Fig. 1). + + Searches the L-shaped boundary of the sliding window — the last ref row + and the last input column — for the cell with minimum normalised + accumulated cost. The direction is chosen based on where that minimum + falls: + - Corner (last row AND last col) → BOTH + - In the column (not at last row) → TARGET (advance input to catch up) + - In the row (not at last col) → REF (advance ref to catch up) + + Normalization: cost / (1 + i + j), an O(1) approximation of the true + path-length divisor (DAFx 2005, Sec. 2). + """ + ref_ws = self._ref_win_size() + input_ws = self._input_win_size() + ref_off = self._ref_offset() + input_off = self._input_offset() + + # Current row and column in window coordinates + row = ref_ws - 1 # last ref row + col = input_ws - 1 # last input column + + best_cost = np.inf + best_row = row + best_col = col + + # Search current column (last input column) across all ref rows + for r in range(ref_ws): + abs_r = ref_off + r + abs_i = input_off + col + cost = self.acc_dist_matrix[r, col] + norm_cost = cost / (1 + abs_r + abs_i) + if norm_cost < best_cost: + best_cost = norm_cost + best_row = r + best_col = col + + # Search current row (last ref row) across all input columns + for c in range(input_ws): + abs_r = ref_off + row + abs_i = input_off + c + cost = self.acc_dist_matrix[row, c] + norm_cost = cost / (1 + abs_r + abs_i) + if norm_cost < best_cost: + best_cost = norm_cost + best_row = row + best_col = c + + # Convert best window coords to absolute + self.best_ref = ref_off + best_row + self.best_input = input_off + best_col + + # Determine direction based on where the best point is + if best_row == row and best_col == col: + return Direction.BOTH + elif best_row < row: + # Best is in the column (not at current row) → input needs to catch up + return Direction.TARGET else: - offset = self.offset() - x0, y0 = offset[0], offset[1] - if self.candidate[0] == self.ref_pointer - x0: - next_direction = Direction.REF - else: - assert self.candidate[1] == self.input_pointer - y0 - next_direction = Direction.TARGET - return next_direction - - def get_new_input(self): - input_feature, f_time = self.queue.get(timeout=QUEUE_TIMEOUT) - self.last_queue_update = time.time() - self.input_features = np.vstack([self.input_features, input_feature]) - self.input_pointer += self.frame_per_seg + # Best is in the row (not at current col) → ref needs to catch up + return Direction.REF + + def save_history(self): + """Append current best alignment point to warping path.""" + new_point = np.array([[self.best_ref], [self.best_input]]) + self.wp = np.concatenate((self.wp, new_point), axis=1) + + def __call__(self, input_features: NDArray[np.float32]) -> int: + self.step(input_features) + return self.current_position + + def step(self, input_features): + """Process one input frame and update alignment state. + + Handles REF-only catch-up internally so that each call + consumes exactly one input frame. + + Parameters + ---------- + input_features : NDArray[np.float32] + A single input feature frame. + """ + # First call: initialization (matches run()'s pre-loop logic) + if not self._initialized: + self._accept_input(input_features) + self.ref_pointer += self.frame_per_seg + self._compute_cell(0, 0) + self._initialized = True + self.input_index += 1 + return + + # Phase 1: REF-only catch-up (0 or more iterations) + direction = None + while self.is_still_following(): + direction = self._determine_direction() + if direction != Direction.REF: + break + self._advance_ref() + self.run_count_ref += 1 + self.run_count_input = 0 + self.save_history() + self.current_position = self.best_ref + + if not self.is_still_following(): + return + + # Phase 2: consume input (direction is TARGET or BOTH) + self._accept_input(input_features) + self._compute_input_column() + + if direction is not Direction.TARGET: # BOTH + self._advance_ref() + + # Update run counts + if direction == Direction.TARGET: + self.run_count_input += 1 + self.run_count_ref = 0 + else: # BOTH + self.run_count_ref = 1 + self.run_count_input = 0 + + self.save_history() + self.current_position = self.best_ref + self.input_index += 1 def is_still_following(self): return self.ref_pointer <= (self.N_ref - self.frame_per_seg) @@ -374,44 +354,23 @@ def run(self, verbose=True): The warping path as a 2D array where each column contains (reference_position, input_position) """ - self.ref_pointer += self.w - self.get_new_input() - self.init_path_cost() + self.reset() if verbose: pbar = progressbar.ProgressBar(max_value=self.N_ref, redirect_stdout=True) while self.is_still_following(): - direction = self.select_next_direction() - - if direction is not Direction.REF: - self.get_new_input() - elif direction is Direction.REF: - self.ref_pointer += self.frame_per_seg - - self.update_path_cost(direction) - - if direction == self.previous_direction: - self.run_count += 1 - else: - self.run_count = 1 - - self.previous_direction = direction - updated_position = self.wp[0][-1] - self.current_position = updated_position + input_feature, f_time = self.queue.get(timeout=QUEUE_TIMEOUT) + self.last_queue_update = time.time() + self.step(input_feature) if verbose: pbar.update(int(self.current_position)) - self.save_history() - - if direction is not Direction.REF: - self.input_index += 1 - latency = time.time() - self.last_queue_update - self.latency_stats = set_latency_stats( - latency, self.latency_stats, self.input_index - ) - + latency = time.time() - self.last_queue_update + self.latency_stats = set_latency_stats( + latency, self.latency_stats, self.input_index + ) yield self.current_position if verbose: diff --git a/matchmaker/features/audio.py b/matchmaker/features/audio.py index 25b1ff7..0089fdb 100644 --- a/matchmaker/features/audio.py +++ b/matchmaker/features/audio.py @@ -21,7 +21,6 @@ NORM = np.inf FEATURES = "chroma" QUEUE_TIMEOUT = 10 -WINDOW_SIZE = 5 # Type hint for Input Audio frame. InputAudioSeries = np.ndarray @@ -198,15 +197,74 @@ def __call__( class LogSpectralEnergyProcessor(Processor): + """ + Log Spectral Energy feature processor based on Dixon (2005). + + Computes a spectral representation using a linear-log frequency scale, + then applies half-wave rectified first-order difference to emphasize + note onsets. + + The frequency axis is mapped to: + - Linear below 370 Hz + - Logarithmic spacing from 370 Hz to 12,500 Hz (49 bins) + - One bin above 12,500 Hz + """ + + LINEAR_FREQ_LIMIT = 370 # Hz (F#4) + LOG_FREQ_LIMIT = 12500 # Hz (G9) + N_LOG_BINS = 49 # paper: 84 - 34 - 1 + def __init__( self, sample_rate: int = SAMPLE_RATE, hop_length: int = HOP_LENGTH, + normalize: bool = True, ): super().__init__() self.sample_rate = sample_rate self.hop_length = hop_length self.n_fft = 2 * self.hop_length + self.normalize = normalize + self._prev_spectrum = None + + # Pre-compute frequency axis and masks + self._freqs = librosa.fft_frequencies(sr=self.sample_rate, n_fft=self.n_fft) + self._linear_mask = self._freqs <= self.LINEAR_FREQ_LIMIT + self._log_mask = (self._freqs > self.LINEAR_FREQ_LIMIT) & ( + self._freqs <= self.LOG_FREQ_LIMIT + ) + self._high_mask = self._freqs > self.LOG_FREQ_LIMIT + + # Create N+1 edges for N log-spaced bins + self._log_bin_edges = np.logspace( + np.log10(self.LINEAR_FREQ_LIMIT), + np.log10(self.LOG_FREQ_LIMIT), + num=self.N_LOG_BINS + 1, + ) + + # Pre-compute bin assignments for log frequencies + log_freqs = self._freqs[self._log_mask] + bin_idx = np.digitize(log_freqs, self._log_bin_edges) - 1 + self._log_bin_idx = np.clip(bin_idx, 0, self.N_LOG_BINS - 1) + + def reset(self): + self._prev_spectrum = None + + def _map_frequencies(self, magnitude): + """Map FFT magnitude spectrum to linear-log frequency scale.""" + linear_bins = magnitude[self._linear_mask] + + log_bins = magnitude[self._log_mask] + n_frames = magnitude.shape[1] + log_mapped = np.zeros((self.N_LOG_BINS, n_frames), dtype=np.float32) + for b in range(self.N_LOG_BINS): + mask = self._log_bin_idx == b + if np.any(mask): + log_mapped[b] = np.sum(log_bins[mask], axis=0) + + high_freq = np.sum(magnitude[self._high_mask], axis=0, keepdims=True) + + return np.vstack((linear_bins, log_mapped, high_freq)).astype(np.float32) def __call__( self, @@ -222,37 +280,28 @@ def __call__( ) magnitude = np.abs(stft_result) - freqs = librosa.fft_frequencies(sr=self.sample_rate, n_fft=self.n_fft) - - linear_limit = 370 - log_limit = 12500 - linear_bins = magnitude[freqs <= linear_limit, :] - log_bins = magnitude[(freqs > linear_limit) & (freqs <= log_limit), :] + # Map to linear-log frequency scale + feature_vector = self._map_frequencies(magnitude) - log_bin_edges = np.logspace( - np.log10(linear_limit), np.log10(log_limit), num=84 - 34 - 1 - ) - log_mapped_bins = np.zeros((len(log_bin_edges), linear_bins.shape[1])) - - for i in range(log_mapped_bins.shape[1]): - log_bin_idx = np.digitize( - freqs[(freqs > linear_limit) & (freqs <= log_limit)], log_bin_edges + # Half-wave rectified first-order difference (stateful for streaming) + if self._prev_spectrum is not None: + combined = np.hstack((self._prev_spectrum, feature_vector)) + diff = np.diff(combined, axis=1) + else: + diff = np.diff( + feature_vector, axis=1, prepend=np.zeros_like(feature_vector[:, :1]) ) - for j in range(1, len(log_bin_edges)): - log_mapped_bins[j - 1, i] = np.sum(log_bins[log_bin_idx == j, i]) - high_freq_bin = np.sum(magnitude[freqs > log_limit, :], axis=0, keepdims=True) + self._prev_spectrum = feature_vector[:, -1:] - feature_vector = np.vstack( - (linear_bins, log_mapped_bins, high_freq_bin), dtype=np.float32 - ) + result = np.maximum(diff, 0).T - diff_feature_vector = np.diff( - feature_vector, axis=0, prepend=feature_vector[0:1, :] - ) - half_wave_rectified_vector = np.maximum(diff_feature_vector, 0) + if self.normalize: + norms = np.linalg.norm(result, axis=1, keepdims=True) + norms = np.maximum(norms, 1e-10) + result = result / norms - return half_wave_rectified_vector.T + return result def compute_features_from_audio( diff --git a/matchmaker/matchmaker.py b/matchmaker/matchmaker.py index 5e3f875..ef65b39 100644 --- a/matchmaker/matchmaker.py +++ b/matchmaker/matchmaker.py @@ -356,6 +356,7 @@ def preprocess_score(self): reference_features = self.processor(self.score_audio) self.reference_features = reference_features + self.processor.reset() else: self.reference_features = self.score_part.note_array() @@ -447,7 +448,7 @@ def get_latency_stats(self): def run_evaluation( self, - perf_annotations: PathLike, + perf_annotations: Union[PathLike, np.ndarray], level: str = "beat", tolerances: list = TOLERANCES_IN_MILLISECONDS, musical_beat: bool = False, # beat annots are difference in some dataset @@ -461,8 +462,9 @@ def run_evaluation( Parameters ---------- - perf_annotations : PathLike - Path to the performance annotations file (tab-separated) + perf_annotations : PathLike or np.ndarray + Path to the performance annotations file (tab-separated), + or numpy array of annotation times in seconds. level : str Level of annotations to use: bar, beat or note tolerance : list @@ -481,8 +483,11 @@ def run_evaluation( if not self._has_run: raise ValueError("Must call run() before evaluation") + if isinstance(perf_annotations, np.ndarray): + perf_annots = perf_annotations + else: + perf_annots = np.loadtxt(fname=perf_annotations, delimiter="\t", usecols=0) score_annots = self.build_score_annotations(level, musical_beat) - perf_annots = np.loadtxt(fname=perf_annotations, delimiter="\t", usecols=0) original_perf_annots_length = len(perf_annots) min_length = min(len(score_annots), len(perf_annots)) diff --git a/matchmaker/utils/misc.py b/matchmaker/utils/misc.py index 99e1961..c845af5 100644 --- a/matchmaker/utils/misc.py +++ b/matchmaker/utils/misc.py @@ -589,7 +589,7 @@ def plot_and_save_score_following_result( if xmin <= target <= xmax and ymin <= ref <= ymax ] for ref, target in cropped_history: - plt.plot(target, ref, ".", color="lime", alpha=0.5, markersize=3) + plt.plot(target, ref, ".", color="cyan", alpha=0.5, markersize=3) # plot ground-truth labels for ref, target in zip(score_annots, perf_annots):