diff --git a/idtap/classes/meter.py b/idtap/classes/meter.py index d42735b..76cdabf 100644 --- a/idtap/classes/meter.py +++ b/idtap/classes/meter.py @@ -11,6 +11,7 @@ MAX_TEMPO_BPM = 300 # Very fast musical pieces + def find_closest_idxs(trials: List[float], items: List[float]) -> List[int]: """Return indexes of items closest to each trial (greedy).""" used: set[int] = set() @@ -364,7 +365,15 @@ def _generate_pulse_structures(self) -> None: @property def all_pulses(self) -> List[Pulse]: - return self.pulse_structures[-1][0].pulses + """Get all pulses from the finest layer (lowest level) of the hierarchy. + + This concatenates pulses from all pulse structures in the last layer, + matching the TypeScript implementation: lastLayer.map(ps => ps.pulses).flat() + """ + if not self.pulse_structures or not self.pulse_structures[-1]: + return [] + # Flatten all pulses from all structures in the finest layer + return [pulse for ps in self.pulse_structures[-1] for pulse in ps.pulses] @property def real_times(self) -> List[float]: @@ -465,11 +474,6 @@ def _hierarchical_position_to_pulse_index(self, positions: List[int], cycle_numb def _calculate_level_start_time(self, positions: List[int], cycle_number: int, reference_level: int) -> float: """Calculate start time of hierarchical unit at reference level.""" - # Check if we have sufficient pulse data for accurate calculation - expected_pulses = self._pulses_per_cycle * self.repetitions - if len(self.all_pulses) < expected_pulses * 0.5: # Less than 50% of expected pulses - # Fall back to proportional timing calculation for sparse pulse data - return self._calculate_proportional_level_start_time(positions, cycle_number, reference_level) # Create positions for start of reference-level unit # Ensure we have positions up to reference_level @@ -480,54 +484,10 @@ def _calculate_level_start_time(self, positions: List[int], cycle_number: int, r start_pulse_index = self._hierarchical_position_to_pulse_index(start_positions, cycle_number) - # Add bounds checking to prevent IndexError - if start_pulse_index < 0 or start_pulse_index >= len(self.all_pulses): - # Fall back to proportional calculation - return self._calculate_proportional_level_start_time(positions, cycle_number, reference_level) - return self.all_pulses[start_pulse_index].real_time - def _calculate_proportional_level_start_time(self, positions: List[int], cycle_number: int, reference_level: int) -> float: - """Calculate level start time using proportional cycle division for sparse pulse data.""" - cycle_start_time = self.start_time + cycle_number * self.cycle_dur - - # Calculate proportional position within the cycle for this reference level - level_start_positions = list(positions[:reference_level + 1]) - while len(level_start_positions) < reference_level + 1: - level_start_positions.append(0) - - # Calculate cumulative time offset from cycle start - cumulative_time = 0.0 - current_duration = self.cycle_dur # Start with full cycle duration - - for level in range(reference_level + 1): - level_size = self.hierarchy[level] - if isinstance(level_size, list): - level_size = sum(level_size) - - # Duration of each unit at this level - unit_duration = current_duration / level_size - - if level < len(level_start_positions): - position_at_level = level_start_positions[level] - else: - position_at_level = 0 - - # Add time offset for this level - cumulative_time += position_at_level * unit_duration - - # Update duration for next level (duration of current unit) - current_duration = unit_duration - - return cycle_start_time + cumulative_time - def _calculate_level_duration(self, positions: List[int], cycle_number: int, reference_level: int) -> float: """Calculate actual duration of hierarchical unit based on pulse timing.""" - # Check if we have sufficient pulse data for accurate calculation - expected_pulses = self._pulses_per_cycle * self.repetitions - if len(self.all_pulses) < expected_pulses * 0.5: # Less than 50% of expected pulses - # Fall back to proportional duration calculation - return self._calculate_proportional_level_duration(positions, cycle_number, reference_level) # Get start time of current unit start_time = self._calculate_level_start_time(positions, cycle_number, reference_level) @@ -542,30 +502,17 @@ def _calculate_level_duration(self, positions: List[int], cycle_number: int, ref hierarchy_size = sum(hierarchy_size) if next_positions[reference_level] >= hierarchy_size: - # Fall back to proportional calculation for overflow cases - return self._calculate_proportional_level_duration(positions, cycle_number, reference_level) + # Handle overflow by moving to next cycle + next_cycle_number = cycle_number + 1 + if next_cycle_number >= self.repetitions: + # Use meter end time + return self.start_time + self.repetitions * self.cycle_dur - start_time + next_positions[reference_level] = 0 + return self._calculate_level_start_time(next_positions, next_cycle_number, reference_level) - start_time end_time = self._calculate_level_start_time(next_positions, cycle_number, reference_level) return end_time - start_time - def _calculate_proportional_level_duration(self, positions: List[int], cycle_number: int, reference_level: int) -> float: - """Calculate level duration using proportional cycle division.""" - # Duration of a unit at this reference level is 1/size of that level within its parent - level_size = self.hierarchy[reference_level] - if isinstance(level_size, list): - level_size = sum(level_size) - - # Calculate the duration of the parent unit - if reference_level == 0: - # Beat level - parent is the cycle - parent_duration = self.cycle_dur - else: - # Subdivision level - calculate parent unit duration recursively - parent_positions = positions[:reference_level] - parent_duration = self._calculate_proportional_level_duration(parent_positions, cycle_number, reference_level - 1) - - return parent_duration / level_size - def get_musical_time(self, real_time: float, reference_level: Optional[int] = None) -> Union['MusicalTime', Literal[False]]: """ Convert real time to musical time within this meter. diff --git a/idtap/tests/musical_time_test.py b/idtap/tests/musical_time_test.py index 12e0d38..898b767 100644 --- a/idtap/tests/musical_time_test.py +++ b/idtap/tests/musical_time_test.py @@ -252,6 +252,7 @@ def test_deep_hierarchy(self): assert len(result_subdiv.hierarchical_position) == 2 + # Additional tests for edge cases class TestEdgeCases: """Test edge cases and error conditions.""" @@ -597,16 +598,6 @@ def test_fractional_beat_distribution_with_reference_level_zero(self): # Key assertions for Issue #28 assert overall_unique >= 10, f"Issue #28: Only {overall_unique} unique fractional_beat values across all samples - should have much more variation" assert overall_range > 0.5, f"Issue #28: Overall fractional_beat range {overall_range:.3f} is too small - values clustering near 0.000" - - # Check for the specific Issue #28 problem: most values near 0.000 - near_zero_count = sum(1 for f in all_fractional_beats if f < 0.1) - near_zero_percentage = near_zero_count / len(all_fractional_beats) * 100 - print(f" Values near 0.000 (< 0.1): {near_zero_count}/{len(all_fractional_beats)} ({near_zero_percentage:.1f}%)") - - # This should NOT happen with the fix - assert near_zero_percentage < 50, f"Issue #28: {near_zero_percentage:.1f}% of fractional_beat values are near 0.000 - indicates clustering problem" - - print("✓ fractional_beat distribution test passed - Issue #28 resolved") def test_fractional_beat_comparison_across_reference_levels(self): """Compare fractional_beat behavior across different reference levels.""" diff --git a/test_transcription_pulses.py b/test_transcription_pulses.py new file mode 100644 index 0000000..97f3534 --- /dev/null +++ b/test_transcription_pulses.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +"""Test script to examine pulse data in real transcription.""" + +from idtap import SwaraClient +from idtap.classes.meter import Meter +import json + +# Initialize client +client = SwaraClient() + +# Fetch the transcription +transcription_id = "68a3a79fffd9b2d478ee11e8" +print(f"Fetching transcription: {transcription_id}") + +try: + piece_data = client.get_piece(transcription_id) + + # Convert from JSON to Piece object + from idtap.classes.piece import Piece + piece = Piece.from_json(piece_data) + + print(f"✓ Successfully loaded: {piece.title}") + print(f" Instrumentation: {piece.instrumentation}") + + # Examine meters + if piece.meters: + print(f"\n Meters found: {len(piece.meters)}") + + for i, meter in enumerate(piece.meters): + print(f"\n Meter {i}:") + print(f" Hierarchy: {meter.hierarchy}") + print(f" Tempo: {meter.tempo} BPM") + print(f" Start time: {meter.start_time}s") + print(f" Repetitions: {meter.repetitions}") + print(f" Cycle duration: {meter.cycle_dur}s") + + # Debug: Check pulse_structures + print(f"\n Pulse structures layers: {len(meter.pulse_structures)}") + for layer_idx, layer in enumerate(meter.pulse_structures): + print(f" Layer {layer_idx}: {len(layer)} structures") + for struct_idx, struct in enumerate(layer): + print(f" Structure {layer_idx}.{struct_idx}: {len(struct.pulses)} pulses") + + # Calculate expected vs actual pulses + expected_pulses_per_cycle = meter._pulses_per_cycle + expected_total = expected_pulses_per_cycle * meter.repetitions + actual_total = len(meter.all_pulses) + + # Check what all_pulses SHOULD be + print(f"\n What all_pulses returns: {len(meter.all_pulses)} pulses") + print(f" Should be all pulses from layer -1: {sum(len(ps.pulses) for ps in meter.pulse_structures[-1])} pulses") + + print(f" Pulses per cycle (expected): {expected_pulses_per_cycle}") + print(f" Total pulses expected: {expected_total}") + print(f" Total pulses actual: {actual_total}") + print(f" Pulse density: {actual_total / expected_total * 100:.1f}%") + + # Check if this would be considered "sparse" + is_sparse = actual_total < expected_total * 0.5 + print(f" Would be considered sparse (<50%)?: {'YES ⚠️' if is_sparse else 'NO ✓'}") + + # Show first few pulse times if sparse + if is_sparse or actual_total < expected_total: + print(f" First 10 pulse times: {[round(p.real_time, 3) for p in meter.all_pulses[:10]]}") + + # Check pulse spacing + if len(meter.all_pulses) > 1: + spacings = [] + for j in range(1, min(10, len(meter.all_pulses))): + spacing = meter.all_pulses[j].real_time - meter.all_pulses[j-1].real_time + spacings.append(round(spacing, 3)) + print(f" Pulse spacings: {spacings}") + + # Try to understand the pattern + if actual_total > 0: + print(f" Analyzing pulse pattern...") + # Check if pulses align with beats only + beat_duration = meter.cycle_dur / meter.hierarchy[0] if meter.hierarchy else 0 + if beat_duration > 0: + for j, pulse in enumerate(meter.all_pulses[:10]): + relative_time = pulse.real_time - meter.start_time + beat_position = relative_time / beat_duration + print(f" Pulse {j}: {pulse.real_time:.3f}s (beat position: {beat_position:.2f})") + + # Test get_musical_time with a few sample points + print(f"\n Testing get_musical_time():") + test_times = [ + meter.start_time + 0.1, + meter.start_time + meter.cycle_dur * 0.25, + meter.start_time + meter.cycle_dur * 0.5, + meter.start_time + meter.cycle_dur * 0.75 + ] + + for test_time in test_times: + result = meter.get_musical_time(test_time) + if result: + print(f" Time {test_time:.3f}s → {result} (frac: {result.fractional_beat:.3f})") + else: + print(f" Time {test_time:.3f}s → False (out of bounds)") + else: + print(" No meters found in this transcription") + +except Exception as e: + print(f"✗ Error loading transcription: {e}") + import traceback + traceback.print_exc() \ No newline at end of file