From 3378837a4d6c7f272957eb612a48f780f70db0d0 Mon Sep 17 00:00:00 2001 From: darth-alexus Date: Thu, 12 Feb 2026 11:37:42 +0100 Subject: [PATCH 1/2] incorporate updates into symbolic oltw dixon --- matchmaker/dp/oltw_dixon.py | 6 +++ matchmaker/matchmaker.py | 93 +++++++++++++++++++++++++++++++++---- matchmaker/utils/misc.py | 16 ++++--- 3 files changed, 99 insertions(+), 16 deletions(-) diff --git a/matchmaker/dp/oltw_dixon.py b/matchmaker/dp/oltw_dixon.py index 38905e8..8c55a05 100644 --- a/matchmaker/dp/oltw_dixon.py +++ b/matchmaker/dp/oltw_dixon.py @@ -74,6 +74,9 @@ def __init__( max_run_count=MAX_RUN_COUNT, frame_per_seg=FRAME_PER_SEG, frame_rate=FRAME_RATE, + state_to_ref_time_map = None, + ref_to_state_time_map = None, + state_space = None, **kwargs, ): super().__init__(reference_features=reference_features) @@ -84,6 +87,9 @@ def __init__( self.distance_func = distance_func.lower() self.max_run_count = max_run_count self.frame_per_seg = frame_per_seg + self.state_to_ref_time_map = state_to_ref_time_map + self.ref_to_state_time_map = ref_to_state_time_map + self.state_space = state_space self.reset() def reset(self): diff --git a/matchmaker/matchmaker.py b/matchmaker/matchmaker.py index 3c70822..f2506a6 100644 --- a/matchmaker/matchmaker.py +++ b/matchmaker/matchmaker.py @@ -5,6 +5,7 @@ import partitura from partitura.io.exportmidi import get_ppq from partitura.score import Part +from partitura.musicanalysis.performance_codec import get_time_maps_from_alignment from matchmaker.dp import OnlineTimeWarpingArzt, OnlineTimeWarpingDixon from matchmaker.features.audio import ( @@ -24,6 +25,8 @@ GaussianAudioPitchTempoHMM, PitchIOIHMM, ) +from matchmaker.utils.tempo_models import KalmanTempoModel + from matchmaker.utils.eval import ( TOLERANCES_IN_BEATS, TOLERANCES_IN_MILLISECONDS, @@ -32,7 +35,7 @@ transfer_from_score_to_predicted_perf, ) from matchmaker.utils.misc import ( - adjust_tempo_for_performance_audio, + adjust_tempo_for_performance_file, generate_score_audio, get_tempo_from_score, is_audio_file, @@ -55,6 +58,37 @@ AVAILABLE_METHODS = ["arzt", "dixon", "hmm", "pthmm"] +KWARGS = { + "audio": + {"dixon": + {"window_size": 10, + } + }, + "midi": + {"arzt": + {"processor": "pianoroll", + "piano_range": True, + }, + "dixon": + {"processor": "pianoroll", + "piano_range": True, + "window_size": 30, + }, + "hmm": + {"processor": "pitch_ioi", + "tempo_model": KalmanTempoModel, + "piano_range": True, + }, + "pthmm": + {"processor": "pitch_ioi", + "piano_range": True, + }, + "outerhmm": + {"processor": "pitch_ioi", + "piano_range": True, + }, + }, +} class Matchmaker(object): """ @@ -100,6 +134,8 @@ def __init__( frame_rate: int = FRAME_RATE, tempo: Optional[float] = None, adjust_tempo: bool = False, + kwargs = KWARGS, + unfold_score = True, ): self.score_file = str(score_file) self.performance_file = ( @@ -107,7 +143,7 @@ def __init__( ) self.input_type = input_type self.feature_type = feature_type - self.frame_rate = frame_rate + self.frame_rate = frame_rate if input_type == "audio" else 1 self.score_part: Optional[Part] = None self.distance_func = distance_func self.device_name_or_index = device_name_or_index @@ -118,14 +154,20 @@ def __init__( self._has_run = False self.method = method self.adjust_tempo = adjust_tempo + self.config = kwargs[input_type][method] # setup score file if score_file is None: raise ValueError("Score file is required") try: - self.score_part = partitura.load_score_as_part(self.score_file) - + # TODO: find a better solution: + if self.score_file.endswith('musicxml'): + self.score_part = partitura.load_musicxml(self.score_file, force_note_ids=True, ignore_invisible_objects=True) + if unfold_score: + self.score_part = partitura.score.unfold_part_maximal(self.score_part, ignore_leaps = False).parts[0] + else: + self.score_part = self.score_part.parts[0] except Exception as e: raise ValueError(f"Invalid score file: {e}") @@ -141,7 +183,7 @@ def __init__( # setup feature processor if self.feature_type is None: - self.feature_type = "chroma" if input_type == "audio" else "pitchclass" + self.feature_type = "chroma" if input_type == "audio" else "pitch_ioi" if self.feature_type == "chroma": self.processor = ChromagramProcessor( @@ -163,7 +205,7 @@ def __init__( self.processor = LogSpectralEnergyProcessor( sample_rate=sample_rate, ) - elif self.feature_type == "pitchclass": + elif self.feature_type == "pitch_ioi": self.processor = PitchIOIProcessor(piano_range=True) elif self.feature_type == "pianoroll": self.processor = PianoRollProcessor(piano_range=True) @@ -222,11 +264,16 @@ def __init__( frame_rate=self.frame_rate, ) elif method == "dixon": + state_to_ref_time_map, ref_to_state_time_map = self.get_time_maps() self.score_follower = OnlineTimeWarpingDixon( reference_features=self.reference_features, queue=self.stream.queue, distance_func=distance_func, frame_rate=self.frame_rate, + window_size=self.config["window_size"], + state_to_ref_time_map=state_to_ref_time_map, + ref_to_state_time_map=ref_to_state_time_map, + state_space=np.unique(self.score_part.note_array()["onset_beat"]) ) elif method == "hmm" and self.input_type == "midi": self.score_follower = PitchIOIHMM( @@ -256,10 +303,11 @@ def preprocess_score(self): if self.input_type == "audio": # Adjust tempo based on performance audio if requested if self.adjust_tempo and self.performance_file is not None: - self.tempo = adjust_tempo_for_performance_audio( + self.tempo = adjust_tempo_for_performance_file( self.score_part, self.performance_file, self.tempo ) - + self.ppart = partitura.utils.music.performance_from_part(self.score_part, bpm=self.tempo) # needed for time maps + self.ppart.sustain_pedal_threshold = 127 # generate score audio self.score_audio = generate_score_audio( self.score_part, self.tempo, SAMPLE_RATE @@ -269,7 +317,32 @@ def preprocess_score(self): self.reference_features = reference_features self.processor.reset() else: - self.reference_features = self.score_part.note_array() + if self.method in ["arzt", "dixon"]: + if self.performance_file is not None: + # tempo is slightly adjusted to reflect the tempo of the performance midi + self.tempo = adjust_tempo_for_performance_file( + self.score_part, self.performance_file, self.tempo + ) + self.ppart = partitura.utils.music.performance_from_part(self.score_part, bpm=self.tempo) + self.ppart.sustain_pedal_threshold = 127 + polling_period = 0.01 + self.reference_features = ( + partitura.utils.music.compute_pianoroll( + note_info=self.ppart, + time_unit="sec", + time_div=int(np.round(1 / polling_period)), + binary=True, + piano_range=True, + ) + .toarray() + .T + ).astype(np.float32) + else: + self.reference_features = self.score_part.note_array() + + def get_time_maps(self): + alignment = [{"label" : "match", "score_id" : nid, "performance_id": nid} for nid in self.score_part.note_array()["id"]] + return get_time_maps_from_alignment(self.ppart.note_array(), self.score_part.note_array(), alignment) def _convert_frame_to_beat(self, current_frame: int) -> float: """ @@ -419,7 +492,7 @@ def run_evaluation( f"Length of the annotation changed: {original_perf_annots_length} -> {len(perf_annots_predicted)}" ) - if debug: + if debug and self.input_type == "audio": save_debug_results( self.score_file, self.score_audio, diff --git a/matchmaker/utils/misc.py b/matchmaker/utils/misc.py index c845af5..e3d8710 100644 --- a/matchmaker/utils/misc.py +++ b/matchmaker/utils/misc.py @@ -14,6 +14,7 @@ from typing import Any, Dict, Iterable, List, Optional, Union import librosa +import mido import numpy as np import partitura import scipy @@ -422,25 +423,28 @@ def get_tempo_at_beat( return current_tempo -def adjust_tempo_for_performance_audio( - score: ScoreLike, performance_audio: Path, default_tempo: int = 120 +def adjust_tempo_for_performance_file( + score: ScoreLike, performance_file: Path, default_tempo: int = 120 ): """ - Adjust the tempo of the score part to match the performance audio. + Adjust the tempo of the score part to match the performance file. We round up the tempo to the nearest 20 bpm to avoid too much optimization. Parameters ---------- score : partitura.score.ScoreLike The score to adjust the tempo of. - performance_audio : Path - The performance audio file to adjust the tempo to. + performance_file : Path + The performance file to adjust the tempo to. default_tempo : int The default tempo of the score. """ score_midi = partitura.save_score_midi(score, out=None) source_length = score_midi.length - target_length = librosa.get_duration(path=str(performance_audio)) + if is_midi_file(performance_file): + target_length = mido.MidiFile(performance_file).length + else: + target_length = librosa.get_duration(path=str(performance_file)) ratio = target_length / source_length rounded_tempo = int( (default_tempo / ratio + 19) // 20 * 20 From bcbab31caca63b92eca9254f692731a8baecb9d3 Mon Sep 17 00:00:00 2001 From: darth-alexus Date: Tue, 24 Feb 2026 17:51:38 +0100 Subject: [PATCH 2/2] get dixon branch up to date with develop --- matchmaker/features/audio.py | 63 ++++- matchmaker/matchmaker.py | 435 ++++++++++++++++++++++------------- matchmaker/utils/eval.py | 186 +++++++++++---- matchmaker/utils/misc.py | 325 ++++++++++++++------------ 4 files changed, 658 insertions(+), 351 deletions(-) diff --git a/matchmaker/features/audio.py b/matchmaker/features/audio.py index 0089fdb..3a9c27b 100644 --- a/matchmaker/features/audio.py +++ b/matchmaker/features/audio.py @@ -20,7 +20,7 @@ DCT_TYPE = 2 NORM = np.inf FEATURES = "chroma" -QUEUE_TIMEOUT = 10 +QUEUE_TIMEOUT = 1 # Type hint for Input Audio frame. InputAudioSeries = np.ndarray @@ -159,10 +159,69 @@ def __call__( hop_length=self.hop_length, norm=self.norm, dtype=np.float32, + fmin=librosa.note_to_hz("A0"), + n_bins=88, ) return np.abs(cqt).T[1:-1] +class CQTSpectralFluxProcessor(Processor): + """ + CQT spectrum (88 bins, A0-C8) with optional half-wave rectified spectral flux. + Output shape: (n_frames, 88) or (n_frames, 89) if include_spectral_flux=True. + """ + + def __init__( + self, + sample_rate: int = SAMPLE_RATE, + hop_length: int = HOP_LENGTH, + norm: Optional[Union[float, str]] = NORM, + fmin: Optional[float] = None, + n_bins: int = 88, + bins_per_octave: int = 12, + include_spectral_flux: bool = True, + ): + super().__init__() + self.sample_rate = sample_rate + self.hop_length = hop_length + self.norm = norm + self.fmin = fmin if fmin is not None else librosa.note_to_hz("A0") + self.n_bins = n_bins + self.bins_per_octave = bins_per_octave + self.include_spectral_flux = include_spectral_flux + self.prev_magnitude = None + + def __call__( + self, + y: InputAudioSeries, + ) -> Tuple[Optional[np.ndarray], Dict]: + cqt = librosa.cqt( + y=y, + sr=self.sample_rate, + hop_length=self.hop_length, + fmin=self.fmin, + n_bins=self.n_bins, + bins_per_octave=self.bins_per_octave, + norm=self.norm, + dtype=np.float32, + ) + cqt_features = np.abs(cqt).T + + if self.include_spectral_flux: + if self.prev_magnitude is None: + spectral_flux = np.zeros((cqt_features.shape[0], 1), dtype=np.float32) + else: + diff = np.maximum(cqt_features - self.prev_magnitude, 0) + spectral_flux = np.sum(diff, axis=1, keepdims=True) + + self.prev_magnitude = cqt_features.copy() + features = np.hstack([cqt_features, spectral_flux]) + else: + features = cqt_features + + return features[1:-1] + + class MelSpectrogramProcessor(Processor): def __init__( self, @@ -318,6 +377,8 @@ def compute_features_from_audio( "mel": MelSpectrogramProcessor, "mfcc": MFCCProcessor, "log_spectral": LogSpectralEnergyProcessor, + "cqt": CQTProcessor, + "cqt_spectral_flux": CQTSpectralFluxProcessor, } feature_processor = processor_mapping[processor_name]( diff --git a/matchmaker/matchmaker.py b/matchmaker/matchmaker.py index f2506a6..776e081 100644 --- a/matchmaker/matchmaker.py +++ b/matchmaker/matchmaker.py @@ -1,10 +1,12 @@ import os +import sys +from pathlib import Path from typing import Optional, Union import numpy as np import partitura from partitura.io.exportmidi import get_ppq -from partitura.score import Part +from partitura.score import Part, merge_parts from partitura.musicanalysis.performance_codec import get_time_maps_from_alignment from matchmaker.dp import OnlineTimeWarpingArzt, OnlineTimeWarpingDixon @@ -13,20 +15,26 @@ SAMPLE_RATE, ChromagramProcessor, CQTProcessor, + CQTSpectralFluxProcessor, LogSpectralEnergyProcessor, MelSpectrogramProcessor, MFCCProcessor, ) -from matchmaker.features.midi import PianoRollProcessor, PitchIOIProcessor +from matchmaker.features.midi import ( + PianoRollProcessor, + PitchClassPianoRollProcessor, + PitchIOIProcessor, +) from matchmaker.io.audio import AudioStream from matchmaker.io.midi import MidiStream from matchmaker.prob.hmm import ( GaussianAudioPitchHMM, GaussianAudioPitchTempoHMM, + PitchHMM, PitchIOIHMM, ) -from matchmaker.utils.tempo_models import KalmanTempoModel - +from matchmaker.prob.outer_product_hmm import OuterProductHMM +from matchmaker.prob.outer_product_hmm_audio import AudioOuterProductHMM from matchmaker.utils.eval import ( TOLERANCES_IN_BEATS, TOLERANCES_IN_MILLISECONDS, @@ -42,54 +50,73 @@ is_midi_file, save_debug_results, ) +from matchmaker.utils.tempo_models import KalmanTempoModel + +sys.setrecursionlimit(10_000) PathLike = Union[str, bytes, os.PathLike] DEFAULT_TEMPO = 120 + + DEFAULT_DISTANCE_FUNCS = { "arzt": OnlineTimeWarpingArzt.DEFAULT_DISTANCE_FUNC, "dixon": OnlineTimeWarpingDixon.DEFAULT_DISTANCE_FUNC, "hmm": None, + "outerhmm": None, + "audio_outerhmm": None, + "pthmm": None, } DEFAULT_METHODS = { "audio": "arzt", - "midi": "hmm", + "midi": "outerhmm", } -AVAILABLE_METHODS = ["arzt", "dixon", "hmm", "pthmm"] - +AVAILABLE_METHODS = ["arzt", "dixon", "hmm", "pthmm", "outerhmm", "audio_outerhmm"] KWARGS = { - "audio": - {"dixon": - {"window_size": 10, - } + "audio": { + "dixon": { + "window_size": 10, + }, + "arzt": { + "window_size": 5, + "start_window_size": 0.25, + "step_size" : 5,}, + "audio_outerhmm": { + "sample_rate": 16000, + "frame_rate": 50, + }, + }, + "midi": { + "arzt": { + "processor": "pianoroll", + "piano_range": True, + "window_size": 200, + "start_window_size": 200, + "step_size": 5, + }, + "dixon": { + "processor": "pianoroll", + "piano_range": True, + "window_size": 30, + }, + "hmm": { + "processor": "pitch_ioi", + "tempo_model": KalmanTempoModel, + "piano_range": True, + }, + "pthmm": { + "processor": "pitch_ioi", + "piano_range": True, }, - "midi": - {"arzt": - {"processor": "pianoroll", - "piano_range": True, - }, - "dixon": - {"processor": "pianoroll", - "piano_range": True, - "window_size": 30, - }, - "hmm": - {"processor": "pitch_ioi", - "tempo_model": KalmanTempoModel, - "piano_range": True, - }, - "pthmm": - {"processor": "pitch_ioi", - "piano_range": True, - }, - "outerhmm": - {"processor": "pitch_ioi", - "piano_range": True, - }, + "outerhmm": { + "processor": "pitch_ioi", + "piano_range": True, }, + }, } + class Matchmaker(object): """ A class to perform online score following with I/O support for audio and MIDI @@ -133,17 +160,22 @@ def __init__( sample_rate: int = SAMPLE_RATE, frame_rate: int = FRAME_RATE, tempo: Optional[float] = None, - adjust_tempo: bool = False, - kwargs = KWARGS, - unfold_score = True, + kwargs=KWARGS, + unfold_score=True, + auto_adjust_tempo: bool = False, ): self.score_file = str(score_file) self.performance_file = ( str(performance_file) if performance_file is not None else None ) + + # if input_type not in ("audio", "midi"): + # raise ValueError(f"Invalid input_type {input_type}") self.input_type = input_type self.feature_type = feature_type self.frame_rate = frame_rate if input_type == "audio" else 1 + self.sample_rate = sample_rate + self.hop_length = sample_rate // self.frame_rate self.score_part: Optional[Part] = None self.distance_func = distance_func self.device_name_or_index = device_name_or_index @@ -152,69 +184,94 @@ def __init__( self.score_follower = None self.reference_features = None self._has_run = False + + # validate method first + if method is None: + method = DEFAULT_METHODS[self.input_type] + elif method not in AVAILABLE_METHODS: + raise ValueError(f"Invalid method. Available methods: {AVAILABLE_METHODS}") + self.method = method - self.adjust_tempo = adjust_tempo - self.config = kwargs[input_type][method] + self.config = kwargs[self.input_type][self.method] + self.auto_adjust_tempo = auto_adjust_tempo - # setup score file - if score_file is None: - raise ValueError("Score file is required") + # Apply method-specific defaults from config (only if not explicitly provided by caller) + if sample_rate == SAMPLE_RATE and "sample_rate" in self.config: + self.sample_rate = self.config["sample_rate"] + if frame_rate == FRAME_RATE and "frame_rate" in self.config: + self.frame_rate = self.config["frame_rate"] + self.hop_length = self.sample_rate // self.frame_rate + # setup score file try: - # TODO: find a better solution: - if self.score_file.endswith('musicxml'): - self.score_part = partitura.load_musicxml(self.score_file, force_note_ids=True, ignore_invisible_objects=True) - if unfold_score: - self.score_part = partitura.score.unfold_part_maximal(self.score_part, ignore_leaps = False).parts[0] - else: - self.score_part = self.score_part.parts[0] + ext = Path(self.score_file).suffix.lower() + if ext in (".musicxml", ".xml", ".mxl"): + score = partitura.load_musicxml( + self.score_file, ignore_invisible_objects=True + ) + else: + score = partitura.load_score(self.score_file) + + if unfold_score: + score = partitura.score.unfold_part_maximal(score, ignore_leaps=False) + self.score_part = merge_parts(score.parts) except Exception as e: raise ValueError(f"Invalid score file: {e}") - # Set tempo: user-provided > score marking > default (120 BPM) - # _user_specified_tempo: if True, use uniform tempo; if False, use score tempo map + # Set tempo: user-provided > adjust_tempo (always 120) > score marking > default (120 BPM) if tempo is not None: self.tempo = float(tempo) - self._user_specified_tempo = True + elif auto_adjust_tempo: + self.tempo = DEFAULT_TEMPO else: - self._user_specified_tempo = False score_tempo = get_tempo_from_score(self.score_part, self.score_file) self.tempo = score_tempo if score_tempo is not None else DEFAULT_TEMPO # setup feature processor if self.feature_type is None: - self.feature_type = "chroma" if input_type == "audio" else "pitch_ioi" + if input_type == "audio": + self.feature_type = ( + "cqt_spectral_flux" if method == "audio_outerhmm" else "chroma" + ) + else: + self.feature_type = "pitch_ioi" if self.feature_type == "chroma": self.processor = ChromagramProcessor( - sample_rate=sample_rate, + sample_rate=self.sample_rate, + hop_length=self.hop_length, ) elif self.feature_type == "mfcc": self.processor = MFCCProcessor( - sample_rate=sample_rate, + sample_rate=self.sample_rate, ) elif self.feature_type == "cqt": self.processor = CQTProcessor( - sample_rate=sample_rate, + sample_rate=self.sample_rate, ) elif self.feature_type == "mel": self.processor = MelSpectrogramProcessor( - sample_rate=sample_rate, + sample_rate=self.sample_rate, ) elif self.feature_type == "lse": self.processor = LogSpectralEnergyProcessor( - sample_rate=sample_rate, + sample_rate=self.sample_rate, ) elif self.feature_type == "pitch_ioi": - self.processor = PitchIOIProcessor(piano_range=True) + self.processor = PitchIOIProcessor(piano_range=self.config["piano_range"]) + elif self.feature_type == "pitchclass": + self.processor = PitchClassPianoRollProcessor() elif self.feature_type == "pianoroll": - self.processor = PianoRollProcessor(piano_range=True) + self.processor = PianoRollProcessor(piano_range=self.config["piano_range"]) + elif self.feature_type == "cqt_spectral_flux": + self.processor = CQTSpectralFluxProcessor( + sample_rate=self.sample_rate, + hop_length=self.hop_length, + ) else: - raise ValueError("Invalid feature type") + raise ValueError(f"Invalid feature type `{self.feature_type}`") - # validate performance file and input_type if self.performance_file is not None: - # check performance file type matches input type if self.input_type == "audio" and not is_audio_file(self.performance_file): raise ValueError( f"Invalid performance file. Expected audio file, but got {self.performance_file}" @@ -224,44 +281,49 @@ def __init__( f"Invalid performance file. Expected MIDI file, but got {self.performance_file}" ) + # setup distance function + if distance_func is None: + distance_func = DEFAULT_DISTANCE_FUNCS[self.method] # setup stream device + if self.input_type == "audio": self.stream = AudioStream( processor=self.processor, device_name_or_index=self.device_name_or_index, file_path=self.performance_file, wait=wait, - target_sr=SAMPLE_RATE, + target_sr=self.sample_rate, + sample_rate=self.sample_rate, + hop_length=self.hop_length, ) elif self.input_type == "midi": self.stream = MidiStream( processor=self.processor, port=self.device_name_or_index, file_path=self.performance_file, + **({"polling_period": None} if method == "outerhmm" else {}), ) else: - raise ValueError("Invalid input type") + raise ValueError(f"Invalid input type {self.input_type}") - # preprocess score (setting reference features, tempo) - self.preprocess_score() - - # validate method first - if method is None: - method = DEFAULT_METHODS[self.input_type] - elif method not in AVAILABLE_METHODS: - raise ValueError(f"Invalid method. Available methods: {AVAILABLE_METHODS}") + self.reference_features = self.preprocess_score() - # setup distance function if distance_func is None: distance_func = DEFAULT_DISTANCE_FUNCS[method] - # setup score follower if method == "arzt": + state_to_ref_time_map, ref_to_state_time_map = self.get_time_maps() self.score_follower = OnlineTimeWarpingArzt( reference_features=self.reference_features, queue=self.stream.queue, distance_func=distance_func, frame_rate=self.frame_rate, + window_size=self.config["window_size"], + start_window_size=self.config["start_window_size"], + state_to_ref_time_map=state_to_ref_time_map, + ref_to_state_time_map=ref_to_state_time_map, + step_size=self.config["step_size"], + state_space=np.unique(self.score_part.note_array()["onset_beat"]) ) elif method == "dixon": state_to_ref_time_map, ref_to_state_time_map = self.get_time_maps() @@ -279,67 +341,72 @@ def __init__( self.score_follower = PitchIOIHMM( reference_features=self.reference_features, queue=self.stream.queue, + tempo_model=self.config["tempo_model"], + has_insertions=True, + piano_range=self.config["piano_range"], ) - elif method == "hmm" and self.input_type == "audio": - # state_space = self._convert_frame_to_beat(np.arange(len(self.reference_features))) - self.score_follower = GaussianAudioPitchHMM( + elif method == "pthmm" and self.input_type == "audio": + self.score_follower = GaussianAudioPitchTempoHMM( reference_features=self.reference_features, queue=self.stream.queue, - # state_space=state_space, - # patience=50, ) - - elif method == "pthmm" and self.input_type == "audio": - self.score_follower = GaussianAudioPitchTempoHMM( + elif method == "audio_outerhmm" and self.input_type == "audio": + self.score_follower = AudioOuterProductHMM( + reference_features=self.reference_features, + queue=self.stream.queue, + tempo=self.tempo, + sample_rate=self.sample_rate, + hop_length=self.hop_length, + ) + elif method == "pthmm" and self.input_type == "midi": + self.score_follower = PitchHMM( + reference_features=self.reference_features, + queue=self.stream.queue, + has_insertions=True, + piano_range=self.config["piano_range"], + ) + elif method == "outerhmm" and self.input_type == "midi": + self.score_follower = OuterProductHMM( reference_features=self.reference_features, - # observation_model=obs_model, queue=self.stream.queue, - # pitch_precision=0.5, - # ioi_precision=2, - transition_scale=0.05, ) + else: + raise ValueError("Invalid method") def preprocess_score(self): - if self.input_type == "audio": - # Adjust tempo based on performance audio if requested - if self.adjust_tempo and self.performance_file is not None: - self.tempo = adjust_tempo_for_performance_file( - self.score_part, self.performance_file, self.tempo - ) - self.ppart = partitura.utils.music.performance_from_part(self.score_part, bpm=self.tempo) # needed for time maps + """Preprocess score to extract reference features.""" + if self.auto_adjust_tempo and self.performance_file is not None: + self.tempo = adjust_tempo_for_performance_file( + self.score_part, self.performance_file, self.tempo + ) + + if self.method in {"arzt", "dixon"}: + self.ppart = partitura.utils.music.performance_from_part(self.score_part, bpm=self.tempo) self.ppart.sustain_pedal_threshold = 127 - # generate score audio - self.score_audio = generate_score_audio( - self.score_part, self.tempo, SAMPLE_RATE - ).astype(np.float32) - - reference_features = self.processor(self.score_audio) - self.reference_features = reference_features - self.processor.reset() - else: - if self.method in ["arzt", "dixon"]: - if self.performance_file is not None: - # tempo is slightly adjusted to reflect the tempo of the performance midi - self.tempo = adjust_tempo_for_performance_file( - self.score_part, self.performance_file, self.tempo - ) - self.ppart = partitura.utils.music.performance_from_part(self.score_part, bpm=self.tempo) - self.ppart.sustain_pedal_threshold = 127 + if self.input_type == "audio": + self.score_audio = generate_score_audio( + self.score_part, self.tempo, self.sample_rate + ).astype(np.float32) + reference_features = self.processor(self.score_audio) + self.processor.reset() + return reference_features + else: polling_period = 0.01 - self.reference_features = ( + reference_features = ( partitura.utils.music.compute_pianoroll( note_info=self.ppart, time_unit="sec", time_div=int(np.round(1 / polling_period)), binary=True, - piano_range=True, + piano_range=self.config["piano_range"], ) .toarray() .T ).astype(np.float32) - else: - self.reference_features = self.score_part.note_array() - + return reference_features + else: + return self.score_part.note_array() + def get_time_maps(self): alignment = [{"label" : "match", "score_id" : nid, "performance_id": nid} for nid in self.score_part.note_array()["id"]] return get_time_maps_from_alignment(self.ppart.note_array(), self.score_part.note_array(), alignment) @@ -363,26 +430,58 @@ def _convert_frame_to_beat(self, current_frame: int) -> float: ) return beat_position - def build_score_annotations(self, level="beat", musical_beat: bool = False): + def build_score_annotations( + self, + level="beat", + musical_beat: bool = False, + return_type: str = "beats", # "beat" or "seconds" + ): + """ + Build score annotations in beat or second unit. + + Parameters + ---------- + level : str + Level of annotations to use: beat or note (chord onset level) + musical_beat : bool + Whether to use musical beat + return_type : {"beat", "seconds"} + Type of annotations to return: beat or seconds (time unit) + + Returns + ------- + score_annots : np.ndarray + Array of score annotations in beat or second unit + """ score_annots = [] - if level == "beat": # TODO: add bar-level, note-level + if level == "beat": if musical_beat: self.score_part.use_musical_beat() # for asap dataset note_array = np.unique(self.score_part.note_array()["onset_beat"]) start_beat = np.ceil(note_array.min()) end_beat = np.floor(note_array.max()) - self.beats = np.arange(start_beat, end_beat + 1) + score_annots_in_beat = np.arange(start_beat, end_beat + 1) + elif level == "note": + snote_array = self.score_part.note_array() + score_annots_in_beat = np.unique(snote_array["onset_beat"]) + else: + raise ValueError(f"Invalid score annotation level: {level}") - beat_timestamp = [ + if return_type == "beats": + return score_annots_in_beat + elif return_type == "seconds": + score_annots_in_seconds = [ self.score_part.inv_beat_map(beat) / self.score_part.quarter_duration_map( self.score_part.inv_beat_map(beat) ) * (60 / self.tempo) - for beat in self.beats + for beat in score_annots_in_beat ] + return np.array(score_annots_in_seconds) + else: + raise ValueError(f"Invalid return type: {return_type}") - score_annots = np.array(beat_timestamp) return score_annots def convert_timestamps_to_beats(self, timestamps): @@ -433,13 +532,13 @@ def get_latency_stats(self): def run_evaluation( self, perf_annotations: Union[PathLike, np.ndarray], - level: str = "beat", + level: str = "note", tolerances: list = TOLERANCES_IN_MILLISECONDS, musical_beat: bool = False, # beat annots are difference in some dataset debug: bool = False, save_dir: PathLike = None, run_name: str = None, - in_seconds: bool = True, # 'True' for performance-based, 'False' for score-based + domain: str = "performance", # "score" or "performance" ) -> dict: """ Evaluate the score following process @@ -455,8 +554,9 @@ def run_evaluation( Tolerances to use for evaluation (in milliseconds) debug : bool Whether to save the score and performance audio with beat annotations - axis : str - Evaluation axis, either 'score' or 'performance' + domain : str + Evaluation domain, either "score" or "performance". + "score" domain evaluates in beat unit, "performance" domain evaluates in second unit. (Default: "performance") Returns ------- @@ -471,66 +571,83 @@ def run_evaluation( perf_annots = perf_annotations else: perf_annots = np.loadtxt(fname=perf_annotations, delimiter="\t", usecols=0) - score_annots = self.build_score_annotations(level, musical_beat) - original_perf_annots_length = len(perf_annots) + + return_type = "seconds" if domain == "performance" else "beats" + score_annots = self.build_score_annotations(level, musical_beat, return_type) + + original_perf_annots_counts = len(perf_annots) min_length = min(len(score_annots), len(perf_annots)) score_annots = score_annots[:min_length] perf_annots = perf_annots[:min_length] + mode = ( + "state" + if (self.input_type == "midi" or self.method == "audio_outerhmm") + else "frame" + ) perf_annots_predicted = transfer_from_score_to_predicted_perf( - self.score_follower.warping_path, score_annots, frame_rate=self.frame_rate + self.score_follower.warping_path, + score_annots, + frame_rate=self.frame_rate, + mode=mode, ) score_annots_predicted = transfer_from_perf_to_predicted_score( - self.score_follower.warping_path, perf_annots, frame_rate=self.frame_rate + self.score_follower.warping_path, + perf_annots, + frame_rate=self.frame_rate, + mode=mode, ) score_annots = score_annots[: len(score_annots_predicted)] - if original_perf_annots_length != len(perf_annots_predicted): + if original_perf_annots_counts != len(perf_annots_predicted): print( - f"Length of the annotation changed: {original_perf_annots_length} -> {len(perf_annots_predicted)}" + f"Length of the annotation changed: {original_perf_annots_counts} -> {len(perf_annots_predicted)}" ) - if debug and self.input_type == "audio": - save_debug_results( - self.score_file, - self.score_audio, - score_annots, - score_annots_predicted, - self.performance_file, - perf_annots, - perf_annots_predicted, - self.score_follower, - self.frame_rate, - save_dir, - run_name, - ) - - if in_seconds: + # Evaluation metrics + if domain == "performance": eval_results = get_evaluation_results( perf_annots, perf_annots_predicted, - total_length=original_perf_annots_length, + total_counts=original_perf_annots_counts, tolerances=tolerances, ) else: - score_annots = self.beats score_annots_predicted = self.convert_timestamps_to_beats( score_annots_predicted ) if tolerances == TOLERANCES_IN_MILLISECONDS: - tolerances = TOLERANCES_IN_BEATS # switch to beats + tolerances = TOLERANCES_IN_BEATS eval_results = get_evaluation_results( score_annots, score_annots_predicted, - total_length=original_perf_annots_length, + total_counts=original_perf_annots_counts, tolerances=tolerances, in_seconds=False, ) + if self.input_type == "audio": + latency_results = self.get_latency_stats() + eval_results.update(latency_results) + + # Debug: save warping path TSV, results JSON, and plots + if debug and save_dir is not None: + save_debug_results( + warping_path=self.score_follower.warping_path, + score_annots=score_annots, + perf_annots=perf_annots, + perf_annots_predicted=perf_annots_predicted, + eval_results=eval_results, + frame_rate=self.frame_rate, + save_dir=save_dir, + run_name=run_name or "results", + state_space=getattr(self.score_follower, "state_space", None), + ref_features=getattr(self.score_follower, "reference_features", None), + input_features=getattr(self.score_follower, "input_features", None), + distance_func=getattr(self.score_follower, "distance_func", None), + ) - latency_results = self.get_latency_stats() - eval_results.update(latency_results) return eval_results def run(self, verbose: bool = True, wait: bool = True): @@ -548,12 +665,12 @@ def run(self, verbose: bool = True, wait: bool = True): Alignment results with warping path """ with self.stream: - for current_frame in self.score_follower.run(verbose=verbose): - if self.input_type == "audio": - position_in_beat = self._convert_frame_to_beat(current_frame) + for current_position in self.score_follower.run(verbose=verbose): + if self.input_type == "audio" and self.method != "audio_outerhmm": + position_in_beat = self._convert_frame_to_beat(current_position) yield position_in_beat else: - yield float(self.score_follower.state_space[current_frame]) + yield float(self.score_follower.state_space[current_position]) self._has_run = True return self.score_follower.warping_path diff --git a/matchmaker/utils/eval.py b/matchmaker/utils/eval.py index b7009ed..20e5192 100644 --- a/matchmaker/utils/eval.py +++ b/matchmaker/utils/eval.py @@ -1,4 +1,4 @@ -from typing import TypedDict +from typing import TypedDict, Union import numpy as np import scipy @@ -7,65 +7,171 @@ TOLERANCES_IN_BEATS = [0.05, 0.1, 0.3, 0.5, 1, 2] -def transfer_positions(wp, ref_anns, frame_rate, reverse=False): +def transfer_positions( + wp, + ref_anns, + frame_rate, + reverse=False, + *, + mode: str = "auto", + reducer: str = "min", + state_offset: Union[int, str] = "auto", + output: str = "seconds", +): """ Transfer the positions of the reference annotations to the target annotations using the warping path. + + This function supports two common warping-path conventions: + + - **frame mode** (classic DTW-style): wp[0] and wp[1] are frame indices for reference/target features. + - **state mode** (HMM/score-state): wp[0] contains *reference state indices* and wp[1] contains *target frame indices*. + Parameters ---------- wp : np.array with shape (2, T) array of warping path. warping_path[0] is the index of the reference (score) feature and warping_path[1] is the index of the target(input) feature. ref_ann : List[float] - reference annotations in seconds. + In **frame mode**, reference annotations in seconds. + In **state mode**, a sequence whose length equals the number of reference states (e.g., score unique_onsets); + the values are not used except for determining the number of states. frame_rate : int frame rate of the audio. + reverse : bool + If True, swap the direction (target -> reference). + mode : {"auto", "frame", "state"} + Warping-path convention. "auto" picks "state" when wp[0] looks like small discrete state indices. + reducer : {"min", "max", "median", "mean"} + In **state mode**, how to select a single representative target frame for each state when multiple wp entries + map to the same state. + state_offset : {"auto"} or int + In **state mode**, wp[0] may start at 0 or 1 (or have a leading start-state). "auto" chooses the offset that + best matches the expected number of states. + output : {"seconds", "frames"} + Return unit. "seconds" divides frames by frame_rate; "frames" returns frame indices. Returns ------- predicted_targets : np.array with shape (T,) - predicted target positions in seconds. + Predicted target positions (seconds or frames depending on output). """ - # Causal nearest neighbor interpolation + if output not in {"seconds", "frames"}: + raise ValueError(f"Invalid output={output!r}. Use 'seconds' or 'frames'.") + if reverse: x, y = wp[1], wp[0] else: x, y = wp[0], wp[1] - ref_anns_frame = np.round(ref_anns * frame_rate) - predicted_targets = np.ones(len(ref_anns)) * np.nan - - for i, r in enumerate(ref_anns_frame): - # 1) Scan all x values less than or equal to r and find the largest x value - past_indices = np.where(x <= r)[0] - if past_indices.size > 0: - # Find indices corresponding to the largest x value - max_x_val = x[past_indices[-1]] - max_x_indices = np.where(x == max_x_val)[0] - - # 2) Among all y values mapped to this x value, select the minimum y value - corresponding_y_values = y[max_x_indices] - min_y_val = np.min(corresponding_y_values) - - # predicted_targets.append(min_y_val) - predicted_targets[i] = min_y_val - return np.array(predicted_targets) / frame_rate - - -def transfer_from_score_to_predicted_perf(wp, score_annots, frame_rate): - predicted_perf_idx = transfer_positions(wp, score_annots, frame_rate) + if mode not in {"auto", "frame", "state"}: + raise ValueError(f"Invalid mode={mode!r}. Use 'auto', 'frame', or 'state'.") + + # Heuristic: state paths have small discrete indices (often << target frames), + # while frame paths typically cover most reference frames (unique count is large). + if mode == "auto": + x_unique = np.unique(x) + n_ref = len(ref_anns) + looks_like_state = (x_unique.size <= max(4, 2 * n_ref)) and ( + int(np.max(x)) <= max(10, 5 * n_ref) + ) + mode = "state" if looks_like_state else "frame" + + if mode == "frame": + # Causal nearest neighbor interpolation (reference seconds -> reference frames -> target frames) + ref_anns_frame = np.round(np.asarray(ref_anns) * frame_rate) + predicted_targets = np.ones(len(ref_anns_frame), dtype=float) * np.nan + + for i, r in enumerate(ref_anns_frame): + # 1) Scan all x values less than or equal to r and find the largest x value + past_indices = np.where(x <= r)[0] + if past_indices.size > 0: + # Find indices corresponding to the largest x value + max_x_val = x[past_indices[-1]] + max_x_indices = np.where(x == max_x_val)[0] + + # 2) Among all y values mapped to this x value, select the minimum y value + corresponding_y_values = y[max_x_indices] + predicted_targets[i] = float(np.min(corresponding_y_values)) + + if output == "frames": + return predicted_targets + return np.asarray(predicted_targets) / frame_rate + + # mode == "state" + # Goal: for each reference state index, select representative target frame from wp. + num_states = len(ref_anns) + predicted_frames = np.ones(num_states, dtype=float) * np.nan + + x_int = np.asarray(x, dtype=int) + y_int = np.asarray(y, dtype=int) + + if reducer not in {"min", "max", "median", "mean"}: + raise ValueError( + f"Invalid reducer={reducer!r}. Use 'min', 'max', 'median', or 'mean'." + ) + + if state_offset == "auto": + # Choose offset that maximizes overlap between expected states and observed wp state indices. + observed = np.unique(x_int) + candidates = [] + for off in (0, 1, int(np.min(x_int))): + if off not in candidates: + candidates.append(off) + best_off = candidates[0] + best_overlap = -1 + for off in candidates: + expected = np.arange(off, off + num_states, dtype=int) + overlap = np.intersect1d(observed, expected).size + if overlap > best_overlap: + best_overlap = overlap + best_off = off + offset = best_off + else: + offset = int(state_offset) + + for s in range(num_states): + wp_state = s + offset + idx = np.where(x_int == wp_state)[0] + if idx.size == 0: + continue + vals = y_int[idx].astype(float) + if reducer == "min": + predicted_frames[s] = float(np.min(vals)) + elif reducer == "max": + predicted_frames[s] = float(np.max(vals)) + elif reducer == "median": + predicted_frames[s] = float(np.median(vals)) + else: # mean + predicted_frames[s] = float(np.mean(vals)) + + if output == "frames": + return predicted_frames + return predicted_frames / frame_rate + + +def transfer_from_score_to_predicted_perf(wp, score_annots, frame_rate, mode="auto"): + predicted_perf_idx = transfer_positions( + wp, + score_annots, + frame_rate, + mode=mode, + ) return predicted_perf_idx -def transfer_from_perf_to_predicted_score(wp, perf_annots, frame_rate): - predicted_score_idx = transfer_positions(wp, perf_annots, frame_rate, reverse=True) +def transfer_from_perf_to_predicted_score(wp, perf_annots, frame_rate, mode="auto"): + predicted_score_idx = transfer_positions( + wp, perf_annots, frame_rate, reverse=True, mode=mode + ) return predicted_score_idx def get_evaluation_results( gt_annots, predicted_annots, - total_length, + total_counts, tolerances=TOLERANCES_IN_MILLISECONDS, + pcr_threshold=2_000, # 2 seconds in_seconds=True, ): if in_seconds: @@ -73,9 +179,7 @@ def get_evaluation_results( else: errors_in_delay = gt_annots - predicted_annots - filtered_errors_in_delay = errors_in_delay[ - np.abs(errors_in_delay) <= tolerances[-1] - ] + filtered_errors_in_delay = errors_in_delay[np.abs(errors_in_delay) <= pcr_threshold] filtered_abs_errors_in_delay = np.abs(filtered_errors_in_delay) results = { @@ -85,16 +189,18 @@ def get_evaluation_results( "skewness": float(f"{scipy.stats.skew(filtered_errors_in_delay):.4f}"), "kurtosis": float(f"{scipy.stats.kurtosis(filtered_errors_in_delay):.4f}"), } - for tau in tolerances: - if in_seconds: + + if in_seconds: + for tau in tolerances: results[f"{tau}ms"] = float( - f"{np.sum(np.abs(errors_in_delay) <= tau) / total_length:.4f}" + f"{np.sum(np.abs(errors_in_delay) <= tau) / total_counts:.4f}" ) - else: + else: + for tau in tolerances: results[f"{tau}b"] = float( - f"{np.sum(np.abs(errors_in_delay) <= tau) / total_length:.4f}" + f"{np.sum(np.abs(errors_in_delay) <= tau) / total_counts:.4f}" ) + + results["pcr"] = float(f"{len(filtered_errors_in_delay) / total_counts:.4f}") results["count"] = len(filtered_abs_errors_in_delay) - pcr_threshold = f"{tolerances[-1]}ms" if in_seconds else f"{tolerances[-1]}b" - results["pcr"] = results[f"{pcr_threshold}"] return results diff --git a/matchmaker/utils/misc.py b/matchmaker/utils/misc.py index e3d8710..ce58269 100644 --- a/matchmaker/utils/misc.py +++ b/matchmaker/utils/misc.py @@ -6,7 +6,6 @@ import csv import numbers -import os import re import xml.etree.ElementTree as ET from pathlib import Path @@ -18,13 +17,10 @@ import numpy as np import partitura import scipy -import soundfile as sf from matplotlib import pyplot as plt from numpy.typing import NDArray from partitura.score import ScoreLike -from matchmaker.features.audio import SAMPLE_RATE - # Tempo marking to BPM mapping # Reference: https://en.wikipedia.org/wiki/Tempo#Basic_tempo_markings TEMPO_MARKING_TO_BPM = { @@ -325,6 +321,7 @@ def get_tempo_from_score( Tries multiple sources in order: 1. Partitura Tempo objects (explicit BPM) 2. MusicXML element (if score_file provided) + 3. Text tempo marking (e.g., "Allegro", "Andante") converted to approximate BPM Parameters ---------- @@ -363,6 +360,12 @@ def get_tempo_from_score( except Exception: pass + # Fallback: extract from text tempo marking (e.g., "Allegro", "Andante") + if score_file is not None: + text_tempo = extract_tempo_marking_from_musicxml(score_file) + if text_tempo is not None: + return text_tempo + return None @@ -444,7 +447,7 @@ def adjust_tempo_for_performance_file( if is_midi_file(performance_file): target_length = mido.MidiFile(performance_file).length else: - target_length = librosa.get_duration(path=str(performance_file)) + target_length = librosa.get_duration(path=str(performance_file)) ratio = target_length / source_length rounded_tempo = int( (default_tempo / ratio + 19) // 20 * 20 @@ -518,161 +521,181 @@ def save_nparray_to_csv(array: NDArray, save_path: str): writer.writerows(array) -def save_mixed_audio( - audio: Union[np.ndarray, str, os.PathLike], - annots: np.ndarray, - save_path: Union[str, os.PathLike], - sr: int = SAMPLE_RATE, -): - if not isinstance(audio, np.ndarray): - audio, _ = librosa.load(audio, sr=sr) - - annots_audio = librosa.clicks( - times=annots, - sr=sr, - click_freq=1000, - length=len(audio), - ) - audio_mixed = audio + annots_audio - sf.write(str(save_path), audio_mixed, sr, subtype="PCM_24") - - -def plot_and_save_score_following_result( - wp, - ref_features, - input_features, - distance_func, - save_dir, - score_annots, - perf_annots, - frame_rate, - name=None, +def plot_alignment( + warping_path: np.ndarray, + perf_annots: np.ndarray, + perf_annots_predicted: np.ndarray, + save_dir: Path, + name: str, + score_y: Optional[np.ndarray] = None, + frame_rate: float = 1.0, + state_space: Optional[np.ndarray] = None, + ref_features: Optional[np.ndarray] = None, + input_features: Optional[np.ndarray] = None, + distance_func=None, ): - xmin = 0 # performance range - xmax = None - ymin = 0 # score range - ymax = None - - xmax = xmax if xmax is not None else input_features.shape[0] - 1 - ymax = ymax if ymax is not None else ref_features.shape[0] - 1 - x_indices = range(xmin, xmax + 1) - y_indices = range(ymin, ymax + 1) - - run_name = name or "results" - save_path = save_dir / f"wp_{run_name}.tsv" - save_nparray_to_csv(wp.T, save_path.as_posix()) - - dist = scipy.spatial.distance.cdist( - ref_features[y_indices, :], - input_features[x_indices, :], - metric=distance_func, - ) # [d, wy] - plt.figure(figsize=(10, 10)) - plt.imshow( - dist, - aspect="auto", - origin="lower", - interpolation="nearest", - extent=(xmin, xmax, ymin, ymax), + """Plot warping path, GT annotations, and predicted points in one figure. + + Layers (back to front): distance matrix → warping path → predicted → GT. + """ + save_dir.mkdir(parents=True, exist_ok=True) + gt = np.asarray(perf_annots, dtype=float) + pred = np.asarray(perf_annots_predicted, dtype=float) + n = min(len(gt), len(pred)) + gt, pred = gt[:n], pred[:n] + + has_dist_matrix = ( + ref_features is not None + and input_features is not None + and distance_func is not None ) - mask_perf = (xmin <= perf_annots * frame_rate) & (perf_annots * frame_rate <= xmax) - mask_score = (ymin <= score_annots * frame_rate) & ( - score_annots * frame_rate <= ymax + + fig, ax = plt.subplots(figsize=(30, 30)) + + if has_dist_matrix: + # DTW mode: everything in frame space + dist = scipy.spatial.distance.cdist( + ref_features, + input_features, + metric=distance_func, + ) + ax.imshow( + dist, + aspect="auto", + origin="lower", + interpolation="nearest", + extent=(0, input_features.shape[0] - 1, 0, ref_features.shape[0] - 1), + ) + x_gt = gt * float(frame_rate) + x_pred = pred * float(frame_rate) + if score_y is not None: + y = np.asarray(score_y, dtype=float)[:n] * float(frame_rate) + else: + y = np.arange(n) + ylabel = "score (frames)" + wp_x = warping_path[1] + wp_y = warping_path[0] + else: + # HMM mode: x in frames, y in beats via state_space + x_gt = gt * float(frame_rate) + x_pred = pred * float(frame_rate) + if score_y is None: + y = np.arange(n) + ylabel = "annotation index" + else: + y = np.asarray(score_y, dtype=float)[:n] + ylabel = "score position (beats)" + wp_x = warping_path[1] + if state_space is not None: + wp_y = state_space[warping_path[0]] + else: + wp_y = warping_path[0] + + # 1. Warping path + if has_dist_matrix: + ax.plot( + wp_x, + wp_y, + ".", + color="white", + alpha=0.7, + markersize=15, + label="warping path", + zorder=2, + ) + else: + ax.plot( + wp_x, + wp_y, + ".", + color="lime", + alpha=0.5, + markersize=15, + label="warping path", + zorder=2, + ) + + # 2. Predicted points + ax.scatter( + x_pred, + y, + label="predicted", + s=80, + alpha=0.9, + marker="o", + color="blue", + linewidths=0, + zorder=3, ) - plt.title( - f"[{save_dir.name}/{run_name}] \n Matchmaker alignment path with ground-truth labels", - fontsize=15, + + # 3. GT annotations (front) + ax.scatter( + x_gt, + y, + label="ground truth", + s=120, + alpha=0.9, + marker="x", + color="red", + linewidths=3, + zorder=4, ) - plt.xlabel("Performance Features", fontsize=15) - plt.ylabel("Score Features", fontsize=15) - - # plot online DTW path - cropped_history = [ - (ref, target) - for (ref, target) in wp.T - if xmin <= target <= xmax and ymin <= ref <= ymax - ] - for ref, target in cropped_history: - plt.plot(target, ref, ".", color="cyan", alpha=0.5, markersize=3) - - # plot ground-truth labels - for ref, target in zip(score_annots, perf_annots): - if (xmin <= target * frame_rate <= xmax) and (ymin <= ref * frame_rate <= ymax): - plt.plot( - target * frame_rate, - ref * frame_rate, - "x", - color="r", - alpha=1, - markersize=3, - markeredgewidth=3, - ) - plt.savefig(save_dir / f"{run_name}.png") + + ax.set_xlabel("performance frame") + ax.set_ylabel(ylabel) + ax.set_title(f"[{save_dir.name}] alignment ({name})") + ax.grid(True, alpha=0.2) + ax.legend(loc="best") + fig.tight_layout() + fig.savefig(save_dir / f"{name}.png", dpi=150) + plt.close(fig) def save_debug_results( - score_file, - score_audio, - score_annots, - score_annots_predicted, - perf_file, - perf_annots, - perf_annots_predicted, - model, - frame_rate, - save_dir=None, - run_name=None, + warping_path: np.ndarray, + score_annots: np.ndarray, + perf_annots: np.ndarray, + perf_annots_predicted: np.ndarray, + eval_results: dict, + frame_rate: float, + save_dir: Path, + run_name: str = "results", + state_space: Optional[np.ndarray] = None, + ref_features: Optional[np.ndarray] = None, + input_features: Optional[np.ndarray] = None, + distance_func=None, ): - # save score audio with beat annotations - score_audio_dir = Path("./score_audio") - score_audio_dir.mkdir(parents=True, exist_ok=True) - run_name_suffix = ( - f"{Path(perf_file).stem}_{run_name}" if run_name else f"{Path(perf_file).stem}" - ) - save_mixed_audio( - score_audio, - score_annots, - save_path=score_audio_dir - / f"score_audio_{Path(score_file).parent.parent.name}_{Path(score_file).stem}_{run_name_suffix}.wav", - ) - # save performance audio with beat annotations - perf_audio_dir = Path("./performance_audio") - perf_audio_dir.mkdir(parents=True, exist_ok=True) - save_mixed_audio( - perf_file, + """Save debug outputs: warping path TSV, results JSON, and alignment plot.""" + save_dir = Path(save_dir) + save_dir.mkdir(parents=True, exist_ok=True) + + # 1. Warping path TSV + results JSON + save_nparray_to_csv(warping_path.T, (save_dir / f"wp_{run_name}.tsv").as_posix()) + import json + + with open(save_dir / f"{run_name}.json", "w") as f: + json.dump(eval_results, f, indent=4) + + # 2. Alignment plot + if state_space is not None: + score_y = state_space + else: + sx = np.asarray(score_annots, dtype=float) + score_y = ( + sx + if sx.ndim == 1 and len(sx) == len(perf_annots) and np.all(np.diff(sx) >= 0) + else None + ) + plot_alignment( + warping_path, perf_annots, - save_path=perf_audio_dir - / f"perf_audio_{Path(perf_file).parent.parent.name}_{Path(perf_file).parent.name}_{run_name_suffix}.wav", - ) - # save score audio with predicted beat annotations - score_predicted_audio_dir = Path("./score_audio_predicted") - score_predicted_audio_dir.mkdir(parents=True, exist_ok=True) - save_mixed_audio( - score_audio, - score_annots_predicted, - save_path=score_predicted_audio_dir - / f"score_audio_{Path(score_file).parent.parent.name}_{Path(score_file).parent.name}_{run_name_suffix}.wav", - ) - # save performance audio with predicted beat annotations - perf_predicted_audio_dir = Path("./performance_audio_predicted") - perf_predicted_audio_dir.mkdir(parents=True, exist_ok=True) - save_mixed_audio( - perf_file, perf_annots_predicted, - save_path=perf_predicted_audio_dir - / f"perf_audio_{Path(perf_file).parent.parent.name}_{Path(perf_file).parent.name}_{run_name_suffix}.wav", - ) - # save score following plot result - save_dir = save_dir or Path("./tests/results") - save_dir.mkdir(parents=True, exist_ok=True) - plot_and_save_score_following_result( - model.warping_path, - model.reference_features, - model.input_features, - model.distance_func, save_dir, - score_annots, - perf_annots, - frame_rate, - name=run_name, + run_name, + score_y=score_y, + frame_rate=frame_rate, + state_space=state_space, + ref_features=ref_features, + input_features=input_features, + distance_func=distance_func, )