Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 17 additions & 70 deletions idtap/classes/meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
MAX_TEMPO_BPM = 300 # Very fast musical pieces



def find_closest_idxs(trials: List[float], items: List[float]) -> List[int]:
"""Return indexes of items closest to each trial (greedy)."""
used: set[int] = set()
Expand Down Expand Up @@ -364,7 +365,15 @@ def _generate_pulse_structures(self) -> None:

@property
def all_pulses(self) -> List[Pulse]:
return self.pulse_structures[-1][0].pulses
"""Get all pulses from the finest layer (lowest level) of the hierarchy.

This concatenates pulses from all pulse structures in the last layer,
matching the TypeScript implementation: lastLayer.map(ps => ps.pulses).flat()
"""
if not self.pulse_structures or not self.pulse_structures[-1]:
return []
# Flatten all pulses from all structures in the finest layer
return [pulse for ps in self.pulse_structures[-1] for pulse in ps.pulses]

@property
def real_times(self) -> List[float]:
Expand Down Expand Up @@ -465,11 +474,6 @@ def _hierarchical_position_to_pulse_index(self, positions: List[int], cycle_numb

def _calculate_level_start_time(self, positions: List[int], cycle_number: int, reference_level: int) -> float:
"""Calculate start time of hierarchical unit at reference level."""
# Check if we have sufficient pulse data for accurate calculation
expected_pulses = self._pulses_per_cycle * self.repetitions
if len(self.all_pulses) < expected_pulses * 0.5: # Less than 50% of expected pulses
# Fall back to proportional timing calculation for sparse pulse data
return self._calculate_proportional_level_start_time(positions, cycle_number, reference_level)

# Create positions for start of reference-level unit
# Ensure we have positions up to reference_level
Expand All @@ -480,54 +484,10 @@ def _calculate_level_start_time(self, positions: List[int], cycle_number: int, r

start_pulse_index = self._hierarchical_position_to_pulse_index(start_positions, cycle_number)

# Add bounds checking to prevent IndexError
if start_pulse_index < 0 or start_pulse_index >= len(self.all_pulses):
# Fall back to proportional calculation
return self._calculate_proportional_level_start_time(positions, cycle_number, reference_level)

return self.all_pulses[start_pulse_index].real_time

def _calculate_proportional_level_start_time(self, positions: List[int], cycle_number: int, reference_level: int) -> float:
"""Calculate level start time using proportional cycle division for sparse pulse data."""
cycle_start_time = self.start_time + cycle_number * self.cycle_dur

# Calculate proportional position within the cycle for this reference level
level_start_positions = list(positions[:reference_level + 1])
while len(level_start_positions) < reference_level + 1:
level_start_positions.append(0)

# Calculate cumulative time offset from cycle start
cumulative_time = 0.0
current_duration = self.cycle_dur # Start with full cycle duration

for level in range(reference_level + 1):
level_size = self.hierarchy[level]
if isinstance(level_size, list):
level_size = sum(level_size)

# Duration of each unit at this level
unit_duration = current_duration / level_size

if level < len(level_start_positions):
position_at_level = level_start_positions[level]
else:
position_at_level = 0

# Add time offset for this level
cumulative_time += position_at_level * unit_duration

# Update duration for next level (duration of current unit)
current_duration = unit_duration

return cycle_start_time + cumulative_time

def _calculate_level_duration(self, positions: List[int], cycle_number: int, reference_level: int) -> float:
"""Calculate actual duration of hierarchical unit based on pulse timing."""
# Check if we have sufficient pulse data for accurate calculation
expected_pulses = self._pulses_per_cycle * self.repetitions
if len(self.all_pulses) < expected_pulses * 0.5: # Less than 50% of expected pulses
# Fall back to proportional duration calculation
return self._calculate_proportional_level_duration(positions, cycle_number, reference_level)

# Get start time of current unit
start_time = self._calculate_level_start_time(positions, cycle_number, reference_level)
Expand All @@ -542,30 +502,17 @@ def _calculate_level_duration(self, positions: List[int], cycle_number: int, ref
hierarchy_size = sum(hierarchy_size)

if next_positions[reference_level] >= hierarchy_size:
# Fall back to proportional calculation for overflow cases
return self._calculate_proportional_level_duration(positions, cycle_number, reference_level)
# Handle overflow by moving to next cycle
next_cycle_number = cycle_number + 1
if next_cycle_number >= self.repetitions:
# Use meter end time
return self.start_time + self.repetitions * self.cycle_dur - start_time
next_positions[reference_level] = 0
return self._calculate_level_start_time(next_positions, next_cycle_number, reference_level) - start_time

end_time = self._calculate_level_start_time(next_positions, cycle_number, reference_level)
return end_time - start_time

def _calculate_proportional_level_duration(self, positions: List[int], cycle_number: int, reference_level: int) -> float:
"""Calculate level duration using proportional cycle division."""
# Duration of a unit at this reference level is 1/size of that level within its parent
level_size = self.hierarchy[reference_level]
if isinstance(level_size, list):
level_size = sum(level_size)

# Calculate the duration of the parent unit
if reference_level == 0:
# Beat level - parent is the cycle
parent_duration = self.cycle_dur
else:
# Subdivision level - calculate parent unit duration recursively
parent_positions = positions[:reference_level]
parent_duration = self._calculate_proportional_level_duration(parent_positions, cycle_number, reference_level - 1)

return parent_duration / level_size

def get_musical_time(self, real_time: float, reference_level: Optional[int] = None) -> Union['MusicalTime', Literal[False]]:
"""
Convert real time to musical time within this meter.
Expand Down
11 changes: 1 addition & 10 deletions idtap/tests/musical_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def test_deep_hierarchy(self):
assert len(result_subdiv.hierarchical_position) == 2



# Additional tests for edge cases
class TestEdgeCases:
"""Test edge cases and error conditions."""
Expand Down Expand Up @@ -597,16 +598,6 @@ def test_fractional_beat_distribution_with_reference_level_zero(self):
# Key assertions for Issue #28
assert overall_unique >= 10, f"Issue #28: Only {overall_unique} unique fractional_beat values across all samples - should have much more variation"
assert overall_range > 0.5, f"Issue #28: Overall fractional_beat range {overall_range:.3f} is too small - values clustering near 0.000"

# Check for the specific Issue #28 problem: most values near 0.000
near_zero_count = sum(1 for f in all_fractional_beats if f < 0.1)
near_zero_percentage = near_zero_count / len(all_fractional_beats) * 100
print(f" Values near 0.000 (< 0.1): {near_zero_count}/{len(all_fractional_beats)} ({near_zero_percentage:.1f}%)")

# This should NOT happen with the fix
assert near_zero_percentage < 50, f"Issue #28: {near_zero_percentage:.1f}% of fractional_beat values are near 0.000 - indicates clustering problem"

print("✓ fractional_beat distribution test passed - Issue #28 resolved")

def test_fractional_beat_comparison_across_reference_levels(self):
"""Compare fractional_beat behavior across different reference levels."""
Expand Down
106 changes: 106 additions & 0 deletions test_transcription_pulses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python
"""Test script to examine pulse data in real transcription."""

from idtap import SwaraClient
from idtap.classes.meter import Meter
import json

# Initialize client
client = SwaraClient()

# Fetch the transcription
transcription_id = "68a3a79fffd9b2d478ee11e8"
print(f"Fetching transcription: {transcription_id}")

try:
piece_data = client.get_piece(transcription_id)

# Convert from JSON to Piece object
from idtap.classes.piece import Piece
piece = Piece.from_json(piece_data)

print(f"✓ Successfully loaded: {piece.title}")
print(f" Instrumentation: {piece.instrumentation}")

# Examine meters
if piece.meters:
print(f"\n Meters found: {len(piece.meters)}")

for i, meter in enumerate(piece.meters):
print(f"\n Meter {i}:")
print(f" Hierarchy: {meter.hierarchy}")
print(f" Tempo: {meter.tempo} BPM")
print(f" Start time: {meter.start_time}s")
print(f" Repetitions: {meter.repetitions}")
print(f" Cycle duration: {meter.cycle_dur}s")

# Debug: Check pulse_structures
print(f"\n Pulse structures layers: {len(meter.pulse_structures)}")
for layer_idx, layer in enumerate(meter.pulse_structures):
print(f" Layer {layer_idx}: {len(layer)} structures")
for struct_idx, struct in enumerate(layer):
print(f" Structure {layer_idx}.{struct_idx}: {len(struct.pulses)} pulses")

# Calculate expected vs actual pulses
expected_pulses_per_cycle = meter._pulses_per_cycle
expected_total = expected_pulses_per_cycle * meter.repetitions
actual_total = len(meter.all_pulses)

# Check what all_pulses SHOULD be
print(f"\n What all_pulses returns: {len(meter.all_pulses)} pulses")
print(f" Should be all pulses from layer -1: {sum(len(ps.pulses) for ps in meter.pulse_structures[-1])} pulses")

print(f" Pulses per cycle (expected): {expected_pulses_per_cycle}")
print(f" Total pulses expected: {expected_total}")
print(f" Total pulses actual: {actual_total}")
print(f" Pulse density: {actual_total / expected_total * 100:.1f}%")

# Check if this would be considered "sparse"
is_sparse = actual_total < expected_total * 0.5
print(f" Would be considered sparse (<50%)?: {'YES ⚠️' if is_sparse else 'NO ✓'}")

# Show first few pulse times if sparse
if is_sparse or actual_total < expected_total:
print(f" First 10 pulse times: {[round(p.real_time, 3) for p in meter.all_pulses[:10]]}")

# Check pulse spacing
if len(meter.all_pulses) > 1:
spacings = []
for j in range(1, min(10, len(meter.all_pulses))):
spacing = meter.all_pulses[j].real_time - meter.all_pulses[j-1].real_time
spacings.append(round(spacing, 3))
print(f" Pulse spacings: {spacings}")

# Try to understand the pattern
if actual_total > 0:
print(f" Analyzing pulse pattern...")
# Check if pulses align with beats only
beat_duration = meter.cycle_dur / meter.hierarchy[0] if meter.hierarchy else 0
if beat_duration > 0:
for j, pulse in enumerate(meter.all_pulses[:10]):
relative_time = pulse.real_time - meter.start_time
beat_position = relative_time / beat_duration
print(f" Pulse {j}: {pulse.real_time:.3f}s (beat position: {beat_position:.2f})")

# Test get_musical_time with a few sample points
print(f"\n Testing get_musical_time():")
test_times = [
meter.start_time + 0.1,
meter.start_time + meter.cycle_dur * 0.25,
meter.start_time + meter.cycle_dur * 0.5,
meter.start_time + meter.cycle_dur * 0.75
]

for test_time in test_times:
result = meter.get_musical_time(test_time)
if result:
print(f" Time {test_time:.3f}s → {result} (frac: {result.fractional_beat:.3f})")
else:
print(f" Time {test_time:.3f}s → False (out of bounds)")
else:
print(" No meters found in this transcription")

except Exception as e:
print(f"✗ Error loading transcription: {e}")
import traceback
traceback.print_exc()
Loading