Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 190 additions & 80 deletions idtap/classes/meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,116 @@ def add_time_points(self, time_points: List[float], layer: int = 1) -> None:
self.pulse_structures[0][0].pulses.sort(key=lambda p: p.real_time)

@staticmethod
def from_time_points(time_points: List[float], hierarchy: List[Union[int, List[int]]],
repetitions: int = 1, layer: int = 0) -> 'Meter':
"""Create a Meter from actual pulse time points, handling timing variations.

This method creates a meter that accurately represents actual pulse timing
(including rubato and tempo variations) rather than theoretical even spacing.
Uses timing regularization algorithm to handle extreme deviations.

Args:
time_points: List of actual pulse times in seconds
hierarchy: Meter hierarchy (e.g., [4, 4, 2])
repetitions: Number of cycle repetitions
layer: Which hierarchical layer the time points represent (0 or 1)

Returns:
Meter object with pulses positioned at the provided time points
"""
if not time_points or len(time_points) < 2:
raise ValueError("Must provide at least two time points")

if not hierarchy or len(hierarchy) < 1:
raise ValueError("Must provide hierarchy to create Meter")

# Work on a copy to avoid modifying the original
time_points = sorted(time_points.copy())

# Step 1: Timing regularization algorithm (from TypeScript)
# Calculate pulse duration and handle extreme deviations
diffs = [time_points[i+1] - time_points[i] for i in range(len(time_points) - 1)]
pulse_dur = sum(diffs) / len(diffs)

# Normalize deviations relative to pulse duration
zeroed_tps = [tp - time_points[0] for tp in time_points]
norms = [pulse_dur * i for i in range(len(time_points))]
tp_diffs = [(zeroed_tps[i] - norms[i]) / pulse_dur for i in range(len(time_points))]

# Insert intermediate time points when deviations exceed 40%
max_iterations = 100 # Prevent infinite loops
iteration = 0
while any(abs(d) > 0.4 for d in tp_diffs) and iteration < max_iterations:
abs_tp_diffs = [abs(d) for d in tp_diffs]
biggest_idx = abs_tp_diffs.index(max(abs_tp_diffs))
diff = tp_diffs[biggest_idx]

if diff > 0:
# Insert time point before the problematic one
if biggest_idx > 0:
new_tp = (time_points[biggest_idx-1] + time_points[biggest_idx]) / 2
time_points.insert(biggest_idx, new_tp)
else:
# Can't insert before first point, adjust differently
break
else:
# Insert time point after the problematic one
if biggest_idx < len(time_points) - 1:
new_tp = (time_points[biggest_idx] + time_points[biggest_idx+1]) / 2
time_points.insert(biggest_idx+1, new_tp)
else:
# Can't insert after last point, adjust differently
break

# Recalculate deviations
diffs = [time_points[i+1] - time_points[i] for i in range(len(time_points) - 1)]
pulse_dur = sum(diffs) / len(diffs)
zeroed_tps = [tp - time_points[0] for tp in time_points]
norms = [pulse_dur * i for i in range(len(time_points))]
tp_diffs = [(zeroed_tps[i] - norms[i]) / pulse_dur for i in range(len(time_points))]

iteration += 1

# Step 2: Calculate meter properties
tempo = 60.0 / pulse_dur
start_time = time_points[0]

# Determine how many repetitions we need based on time points
if isinstance(hierarchy[0], list):
layer0_size = sum(hierarchy[0])
else:
layer0_size = hierarchy[0]

# Calculate minimum repetitions needed
min_reps = max(repetitions, (len(time_points) + layer0_size - 1) // layer0_size)

# Step 3: Create theoretical meter
meter = Meter(
hierarchy=hierarchy,
start_time=start_time,
tempo=tempo,
repetitions=min_reps
)

# Step 4: Adjust pulses to match actual time points
finest_pulses = meter.all_pulses

# Update pulse times to match provided time points
for i, time_point in enumerate(time_points):
if i < len(finest_pulses):
finest_pulses[i].real_time = time_point

# If we have fewer time points than pulses, extrapolate the remaining
if len(time_points) < len(finest_pulses):
# Use the calculated pulse duration to extrapolate
last_provided_time = time_points[-1]
for i in range(len(time_points), len(finest_pulses)):
extrapolated_time = last_provided_time + (i - len(time_points) + 1) * pulse_dur
finest_pulses[i].real_time = extrapolated_time

return meter

@staticmethod
def from_json(obj: Dict) -> 'Meter':
m = Meter(hierarchy=obj.get('hierarchy'),
start_time=obj.get('startTime', 0.0),
Expand Down Expand Up @@ -564,113 +674,113 @@ def get_musical_time(self, real_time: float, reference_level: Optional[int] = No
if real_time < self.start_time:
return False

# Calculate proper end time based on actual pulse timing
# For intermediate cycles: use actual next cycle start pulse
# For final cycle: use theoretical calculation (no next cycle exists)
# Calculate proper end time
if self.all_pulses and len(self.all_pulses) > 0:
# Calculate which cycle this time would fall into
relative_time = real_time - self.start_time
potential_cycle = int(relative_time // self.cycle_dur)

if potential_cycle < self.repetitions - 1:
# This is an intermediate cycle - use actual next cycle start pulse
next_cycle_first_pulse_index = (potential_cycle + 1) * self._pulses_per_cycle
if next_cycle_first_pulse_index < len(self.all_pulses):
actual_end_time = self.all_pulses[next_cycle_first_pulse_index].real_time
else:
# Fallback to theoretical if pulse doesn't exist
actual_end_time = self.start_time + self.repetitions * self.cycle_dur
else:
# This is the final cycle - use theoretical end time
actual_end_time = self.start_time + self.repetitions * self.cycle_dur
else:
# No pulses available - use theoretical calculation
# For boundary validation, use theoretical end time to maintain compatibility with existing tests
# The pulse-based logic will handle actual cycle boundaries in the main calculation
actual_end_time = self.start_time + self.repetitions * self.cycle_dur
else:
# No pulses available - this should not happen as we require pulse data
raise ValueError("No pulse data available for meter. Pulse data is required for musical time calculation.")

if real_time > actual_end_time:
return False

# Validate reference level
ref_level = self._validate_reference_level(reference_level)

# Step 2: Cycle calculation
relative_time = real_time - self.start_time
cycle_number = int(relative_time // self.cycle_dur)
cycle_offset = relative_time % self.cycle_dur

# Step 3: Hierarchical position calculation
positions = []
remaining_time = cycle_offset
# Step 2: Pulse-based cycle calculation (pulse data always available)
if not self.all_pulses or len(self.all_pulses) == 0:
raise ValueError(f"No pulse data available for meter. Pulse data is required for musical time calculation.")

total_finest_subdivisions = self._pulses_per_cycle
current_group_size = total_finest_subdivisions
cycle_number = None
cycle_offset = None

for size in self.hierarchy:
if isinstance(size, list):
size = sum(size)

current_group_size = current_group_size // size
subdivision_duration = current_group_size * self._pulse_dur
for cycle in range(self.repetitions):
cycle_start_pulse_idx = cycle * self._pulses_per_cycle

if subdivision_duration <= 0:
position_at_level = 0
# Get actual cycle start time
if cycle_start_pulse_idx < len(self.all_pulses):
cycle_start_time = self.all_pulses[cycle_start_pulse_idx].real_time

# Get actual cycle end time
next_cycle_start_pulse_idx = (cycle + 1) * self._pulses_per_cycle
if next_cycle_start_pulse_idx < len(self.all_pulses):
cycle_end_time = self.all_pulses[next_cycle_start_pulse_idx].real_time
else:
# Final cycle - use theoretical end
cycle_end_time = self.start_time + self.repetitions * self.cycle_dur

# Check if time falls within this cycle
# For the final cycle, include the exact end time (Issue #38 fix)
if cycle == self.repetitions - 1:
# Final cycle: include exact end time
if cycle_start_time <= real_time <= cycle_end_time:
cycle_number = cycle
cycle_offset = real_time - cycle_start_time
break
else:
# Intermediate cycles: exclude end time (it belongs to next cycle)
if cycle_start_time <= real_time < cycle_end_time:
cycle_number = cycle
cycle_offset = real_time - cycle_start_time
break

# Error if no pulse-based cycle found - indicates data integrity issue
if cycle_number is None:
raise ValueError(f"Unable to determine cycle for time {real_time} using pulse data. "
f"Time does not fall within any pulse-based cycle boundaries. "
f"This indicates a problem with meter pulse data integrity.")

# Step 3: Fractional beat calculation
# Find the correct pulse based on actual time, not hierarchical position
# This is necessary when pulse timing has variations (rubato)

# Find the pulse that comes at or before the query time within the current cycle
cycle_start_pulse_idx = cycle_number * self._pulses_per_cycle
cycle_end_pulse_idx = min((cycle_number + 1) * self._pulses_per_cycle, len(self.all_pulses))

current_pulse_index = None
for pulse_idx in range(cycle_start_pulse_idx, cycle_end_pulse_idx):
if self.all_pulses[pulse_idx].real_time <= real_time:
current_pulse_index = pulse_idx
else:
position_at_level = int(remaining_time // subdivision_duration)

positions.append(position_at_level)

if subdivision_duration > 0:
remaining_time = remaining_time % subdivision_duration
break

# Step 4: Fractional beat calculation
# ALWAYS calculate fractional_beat as position within finest level unit (between pulses)
# This is independent of reference_level, which only affects hierarchical_position truncation
current_pulse_index = self._hierarchical_position_to_pulse_index(positions, cycle_number)
if current_pulse_index is None:
# Query time is before all pulses in this cycle (shouldn't happen but handle gracefully)
current_pulse_index = cycle_start_pulse_idx

# Bounds checking and hierarchical position correction
if current_pulse_index < 0 or current_pulse_index >= len(self.all_pulses):
fractional_beat = 0.0
else:
current_pulse_time = self.all_pulses[current_pulse_index].real_time

# FIX: If the calculated pulse comes after the query time, find the correct pulse
# This happens when hierarchical position calculation rounds up due to timing variations
if current_pulse_time > real_time:
# Find the pulse that comes at or before the query time
corrected_pulse_index = current_pulse_index
while corrected_pulse_index > 0 and self.all_pulses[corrected_pulse_index].real_time > real_time:
corrected_pulse_index -= 1
current_pulse_index = corrected_pulse_index
current_pulse_time = self.all_pulses[current_pulse_index].real_time

# Update positions to reflect the corrected pulse index
# This ensures consistency between hierarchical_position and actual pulse used
positions = self._pulse_index_to_hierarchical_position(current_pulse_index, cycle_number)

# Handle next pulse
if current_pulse_index + 1 < len(self.all_pulses):
next_pulse_time = self.all_pulses[current_pulse_index + 1].real_time
else:
# Last pulse - use next cycle start
next_cycle_start = self.start_time + (cycle_number + 1) * self.cycle_dur
next_pulse_time = next_cycle_start

current_pulse_time = self.all_pulses[current_pulse_index].real_time

# Update positions to reflect the actual pulse found
positions = self._pulse_index_to_hierarchical_position(current_pulse_index, cycle_number)

# Find next pulse for fractional calculation - always use pulse-based logic
if current_pulse_index + 1 < len(self.all_pulses):
next_pulse_time = self.all_pulses[current_pulse_index + 1].real_time
pulse_duration = next_pulse_time - current_pulse_time

if pulse_duration <= 0:
fractional_beat = 0.0
else:
time_from_current_pulse = real_time - current_pulse_time
fractional_beat = time_from_current_pulse / pulse_duration
else:
# This is the last pulse - fractional_beat should be 0.0 since we can't calculate duration
fractional_beat = 0.0

# Clamp to [0, 1] range
fractional_beat = max(0.0, min(1.0, fractional_beat))
# Note: fractional_beat calculation may need refinement when hierarchical position
# calculation finds the wrong pulse due to timing variations, but clamping ensures valid range
# Clamp to [0, 1) range (exclusive upper bound for MusicalTime)
fractional_beat = max(0.0, min(0.9999999999999999, fractional_beat))

# Step 5: Handle reference level truncation (if specified)
# Step 4: Handle reference level truncation (if specified)
if ref_level is not None and ref_level < len(self.hierarchy) - 1:
# Truncate positions to reference level for final result
positions = positions[:ref_level + 1]

# Step 6: Result construction
# Step 5: Result construction
return MusicalTime(
cycle_number=cycle_number,
hierarchical_position=positions,
Expand Down
51 changes: 49 additions & 2 deletions idtap/tests/musical_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,8 @@ def test_issue_38_cycle_boundary_failures(self):
assert result.cycle_number == cycle, f"Boundary {boundary_time} should be in cycle {cycle}"
assert result.hierarchical_position[0] == 0, f"Boundary should be at start of hierarchical position"
else:
# Final boundary - should be treated as start of theoretical next cycle
assert result.cycle_number == cycle, f"Final boundary should indicate cycle {cycle}"
# Final boundary - with pulse-based calculation, this falls in the final actual cycle
assert result.cycle_number == cycle - 1, f"Final boundary should be in final cycle {cycle - 1} (pulse-based)"

# Test times very close to boundaries to ensure they also work
for cycle in range(meter.repetitions):
Expand All @@ -886,3 +886,50 @@ def test_issue_38_cycle_boundary_failures(self):

# Should be in the previous cycle
assert result.cycle_number == cycle, f"Time before boundary should be in cycle {cycle}"

def test_issue_40_cycle_number_correction(self):
"""Test fix for Issue #40: get_musical_time() returns incorrect cycle numbers at cycle boundaries.

Tests that when pulse data has timing variations (rubato), get_musical_time() uses
actual pulse-based cycle boundaries instead of theoretical boundaries.
"""
# Create meter similar to Issue #40 transcription
meter = Meter(hierarchy=[4, 4, 2], start_time=4.093, tempo=58.3, repetitions=8)

# Simulate timing variations by adjusting specific pulses to match Issue #40 boundaries
# Issue #40 cycle boundaries:
# Cycle 3: 16.597 - 20.727 (time 20.601 should be in cycle 3, not 4)
expected_boundaries = [4.093, 8.350, 12.480, 16.597, 20.727, 24.844, 28.968, 33.121, 37.268]

# Adjust pulse timing to match expected boundaries
for cycle in range(len(expected_boundaries) - 1):
cycle_start_pulse_idx = cycle * meter._pulses_per_cycle
if cycle_start_pulse_idx < len(meter.all_pulses):
# Set first pulse of each cycle to exact boundary time
meter.all_pulses[cycle_start_pulse_idx].real_time = expected_boundaries[cycle]

# Adjust remaining pulses in cycle proportionally
cycle_duration = expected_boundaries[cycle + 1] - expected_boundaries[cycle]
for pulse_in_cycle in range(1, 32): # Pulses 1-31 in cycle
pulse_idx = cycle_start_pulse_idx + pulse_in_cycle
if pulse_idx < len(meter.all_pulses):
pulse_time = expected_boundaries[cycle] + (pulse_in_cycle * cycle_duration / 32)
meter.all_pulses[pulse_idx].real_time = pulse_time

# Re-sort pulses by time after modifications
meter.pulse_structures[0][0].pulses.sort(key=lambda p: p.real_time)

# Test the specific Issue #40 case: trajectory in cycle 4
# Time 20.601 should be in cycle 3 (16.597 - 20.727), not cycle 4
test_cases = [
(20.600969, 3, "Start of trajectory 'n' - should be cycle 3"),
(20.602238, 3, "Part of trajectory 'n' - should be cycle 3"),
(20.726, 3, "Just before cycle 4 boundary - should be cycle 3"),
(20.727, 4, "Exactly at cycle 4 boundary - should be cycle 4"),
]

for time, expected_cycle, description in test_cases:
result = meter.get_musical_time(time)
assert result is not False, f"Time {time} should return valid musical time ({description})"
assert result.cycle_number == expected_cycle, \
f"Time {time}: expected cycle {expected_cycle}, got {result.cycle_number} ({description})"
Loading