diff --git a/src/arduino/app_bricks/wave_generator/README.md b/src/arduino/app_bricks/wave_generator/README.md new file mode 100644 index 00000000..3e29e3d5 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/README.md @@ -0,0 +1,312 @@ +# Wave Generator Brick + +A continuous wave generator for real-time audio synthesis. Generates various waveforms (sine, square, sawtooth, triangle) and streams them to a USB speaker with smooth frequency and amplitude transitions. + +## Features + +- **Multiple waveforms**: sine, square, sawtooth, triangle +- **Smooth transitions**: Configurable glide (portamento), attack, and release times +- **Real-time control**: Change frequency, amplitude, and waveform on the fly +- **Efficient**: Pre-allocated buffers and NumPy vectorization +- **Thread-safe**: Safe to call from multiple threads + +## Usage + +### Basic Example + +```python +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +# Create wave generator with default settings +wave_gen = WaveGenerator() + +# Start generation +App.start_brick(wave_gen) + +# Control the generator +wave_gen.set_frequency(440.0) # A4 note +wave_gen.set_amplitude(0.8) # 80% amplitude + +# Keep app running +App.run() +``` + +### Advanced Configuration + +```python +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +# Create with custom settings +wave_gen = WaveGenerator( + sample_rate=16000, + wave_type="square", # Initial waveform + block_duration=0.03, # 30ms blocks + attack=0.01, # 10ms attack time + release=0.03, # 30ms release time + glide=0.02, # 20ms frequency glide +) + +App.start_brick(wave_gen) + +# Change waveform type +wave_gen.set_wave_type("triangle") + +# Set frequency and amplitude +wave_gen.set_frequency(880.0) # A5 note +wave_gen.set_amplitude(0.5) + +# Adjust envelope +wave_gen.set_envelope_params(attack=0.05, release=0.1, glide=0.05) + +App.run() +``` + +### Using Custom Speaker Configuration + +If you need specific device selection or audio format, create a Speaker externally: + +```python +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_peripherals.speaker import Speaker +from arduino.app_utils import App + +# Create Speaker with specific configuration +speaker = Speaker( + device=Speaker.USB_SPEAKER_2, # Select specific USB speaker + sample_rate=16000, + channels=1, + format="S16_LE" # Different audio format +) + +# Pass the speaker to WaveGenerator +wave_gen = WaveGenerator( + sample_rate=16000, + speaker=speaker, # Use custom-configured speaker + wave_type="sine" +) + +App.start_brick(wave_gen) + +# Control the generator +wave_gen.set_frequency(440.0) +wave_gen.set_amplitude(0.7) + +App.run() +# WaveGenerator automatically manages the speaker's lifecycle (start/stop) +``` + +### Theremin-Style Controller + +```python +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator(wave_type="sine", glide=0.02) +App.start_brick(wave_gen) + +def theremin_loop(): + """Simulate theremin-style frequency sweeps.""" + for freq in range(220, 880, 10): + wave_gen.set_frequency(float(freq)) + wave_gen.set_amplitude(0.7) + time.sleep(0.05) + + # Fade out + wave_gen.set_amplitude(0.0) + time.sleep(2) + +App.run(user_loop=theremin_loop) +``` + +### With WebUI Control + +```python +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_bricks.web_ui import WebUI +from arduino.app_utils import App + +wave_gen = WaveGenerator() +ui = WebUI() + +def on_frequency_change(sid, data): + freq = float(data.get('frequency', 440)) + wave_gen.set_frequency(freq) + +def on_amplitude_change(sid, data): + amp = float(data.get('amplitude', 0.5)) + wave_gen.set_amplitude(amp) + +def on_waveform_change(sid, data): + wave_type = data.get('wave_type', 'sine') + wave_gen.set_wave_type(wave_type) + +ui.on_message('set_frequency', on_frequency_change) +ui.on_message('set_amplitude', on_amplitude_change) +ui.on_message('set_waveform', on_waveform_change) + +App.run() +``` + +## API Reference + +### Constructor + +```python +WaveGenerator( + sample_rate: int = 16000, + wave_type: WaveType = "sine", + block_duration: float = 0.03, + attack: float = 0.01, + release: float = 0.03, + glide: float = 0.02, + speaker: Speaker = None, +) +``` + +**Parameters:** +- `sample_rate`: Audio sample rate in Hz (default: 16000) +- `wave_type`: Initial waveform - "sine", "square", "sawtooth", "triangle" (default: "sine") +- `block_duration`: Audio block duration in seconds (default: 0.03) +- `attack`: Amplitude attack time in seconds (default: 0.01) +- `release`: Amplitude release time in seconds (default: 0.03) +- `glide`: Frequency glide time (portamento) in seconds (default: 0.02) +- `speaker`: Pre-configured Speaker instance (optional). If None, a new Speaker will be created with default settings (auto-detect device, FLOAT_LE format, 1 channel, specified sample_rate). WaveGenerator always manages the speaker's lifecycle (calling start/stop) + +### Methods + +#### `set_frequency(frequency: float)` +Set target output frequency with smooth glide transition. + +**Parameters:** +- `frequency`: Target frequency in Hz (typically 20-8000 Hz) + +#### `set_amplitude(amplitude: float)` +Set target output amplitude with smooth attack/release. + +**Parameters:** +- `amplitude`: Target amplitude in range [0.0, 1.0] + +#### `set_wave_type(wave_type: WaveType)` +Change the waveform type. + +**Parameters:** +- `wave_type`: One of "sine", "square", "sawtooth", "triangle" + +#### `set_volume(volume: int)` +Set hardware speaker volume level. + +**Parameters:** +- `volume`: Hardware volume level (0-100) + +#### `get_volume() -> int` +Get current hardware speaker volume level. + +**Returns:** +- Current hardware volume level (0-100) + +#### `set_envelope_params(attack=None, release=None, glide=None)` +Update envelope parameters. + +**Parameters:** +- `attack`: Attack time in seconds (optional) +- `release`: Release time in seconds (optional) +- `glide`: Frequency glide time in seconds (optional) + +#### `get_state() -> dict` +Get current generator state. + +**Returns:** +- Dictionary with keys: `frequency`, `amplitude`, `wave_type`, `volume`, `phase` + +## Waveform Types + +### Sine Wave +Classic smooth sine wave, ideal for pure tones and musical applications. + +```python +wave_gen.set_wave_type("sine") +``` + +### Square Wave +Sharp square wave with odd harmonics, creates a "hollow" or "clarinet-like" sound. + +```python +wave_gen.set_wave_type("square") +``` + +### Sawtooth Wave +Bright sawtooth wave with all harmonics, creates a "buzzy" or "brassy" sound. + +```python +wave_gen.set_wave_type("sawtooth") +``` + +### Triangle Wave +Softer than square, contains only odd harmonics with lower amplitude. + +```python +wave_gen.set_wave_type("triangle") +``` + +## Envelope Parameters + +### Attack Time +Time to rise from current amplitude to target amplitude when increasing. + +**Typical values:** +- `0.001` - 1ms: Very fast, almost instant +- `0.01` - 10ms: Fast, percussive +- `0.1` - 100ms: Slow, pad-like + +### Release Time +Time to fall from current amplitude to target amplitude when decreasing. + +**Typical values:** +- `0.01` - 10ms: Short decay +- `0.05` - 50ms: Medium decay +- `0.5` - 500ms: Long decay, reverb-like + +### Glide Time (Portamento) +Time to smoothly transition from current frequency to target frequency. + +**Typical values:** +- `0.0` - Disabled: Instant frequency changes (may cause clicks) +- `0.005` - 5ms: Minimal, just removes clicks +- `0.02` - 20ms: Natural, smooth transitions (recommended) +- `0.05` - 50ms: Noticeable portamento effect +- `0.1+` - 100ms+: Very "slidey", theremin-like + +## Hardware Requirements + +- **Arduino UNO Q** (or compatible) +- **USB-C® hub** with external power +- **USB audio device** (USB speaker, wireless dongle, or USB-C → 3.5mm adapter) +- **Power supply** (5V, 3A) for USB hub + +**Note:** Must run in **Network Mode** or **SBC Mode** as the USB-C port is needed for the hub. + +## Troubleshooting + +### No Sound Output +- Check USB speaker is connected and powered +- Verify amplitude is > 0: `wave_gen.set_amplitude(0.5)` +- Check hardware volume: `wave_gen.set_volume(80)` + +### Choppy or Clicking Audio +- Increase glide time: `wave_gen.set_envelope_params(glide=0.05)` +- Reduce block duration for lower latency: `WaveGenerator(block_duration=0.02)` +- Close other CPU-intensive applications + +### "No USB speaker found" Error +- Ensure USB-C hub is connected with 5V/3A power supply +- Connect USB audio device to the hub +- Restart the application + +## License + +This brick is licensed under the Mozilla Public License 2.0 (MPL-2.0). + +Copyright (C) 2025 ARDUINO SA diff --git a/src/arduino/app_bricks/wave_generator/__init__.py b/src/arduino/app_bricks/wave_generator/__init__.py new file mode 100644 index 00000000..4af92e21 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from .wave_generator import * + +__all__ = ["WaveGenerator"] diff --git a/src/arduino/app_bricks/wave_generator/brick_config.yaml b/src/arduino/app_bricks/wave_generator/brick_config.yaml new file mode 100644 index 00000000..0d8ae661 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/brick_config.yaml @@ -0,0 +1,4 @@ +id: arduino:wave_generator +name: Wave Generator +description: "Continuous wave generator for audio synthesis. Generates sine, square, sawtooth, and triangle waveforms with smooth frequency and amplitude transitions." +category: audio diff --git a/src/arduino/app_bricks/wave_generator/examples/01_basic_tone.py b/src/arduino/app_bricks/wave_generator/examples/01_basic_tone.py new file mode 100644 index 00000000..bc61b9fa --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/01_basic_tone.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Basic Wave Generator Example + +Generates a simple 440Hz sine wave (A4 note) and demonstrates +basic frequency and amplitude control. +""" + +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +# Create wave generator with default settings +wave_gen = WaveGenerator( + sample_rate=16000, + wave_type="sine", + glide=0.02, # 20ms smooth frequency transitions +) + +# Start the generator +App.start_brick(wave_gen) + +# Set initial frequency and amplitude +wave_gen.set_frequency(440.0) # A4 note (440 Hz) +wave_gen.set_amplitude(0.7) # 70% amplitude +wave_gen.set_volume(80) # 80% hardware volume + +print("Playing 440Hz sine wave (A4 note)") +print("Press Ctrl+C to stop") + +# Keep the application running +App.run() diff --git a/src/arduino/app_bricks/wave_generator/examples/02_waveform_types.py b/src/arduino/app_bricks/wave_generator/examples/02_waveform_types.py new file mode 100644 index 00000000..ef2570e3 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/02_waveform_types.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Waveform Comparison Example + +Cycles through different waveform types to hear the difference +between sine, square, sawtooth, and triangle waves. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator(sample_rate=16000, glide=0.02) +App.start_brick(wave_gen) + +# Set constant frequency and amplitude +wave_gen.set_frequency(440.0) +wave_gen.set_amplitude(0.6) + +waveforms = ["sine", "square", "sawtooth", "triangle"] + + +def cycle_waveforms(): + """Cycle through different waveform types.""" + for wave_type in waveforms: + print(f"Playing {wave_type} wave...") + wave_gen.set_wave_type(wave_type) + time.sleep(3) + + # Silence + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Cycling through waveforms:") +print("sine → square → sawtooth → triangle") +print("Press Ctrl+C to stop") + +App.run(user_loop=cycle_waveforms) diff --git a/src/arduino/app_bricks/wave_generator/examples/03_frequency_sweep.py b/src/arduino/app_bricks/wave_generator/examples/03_frequency_sweep.py new file mode 100644 index 00000000..8218d747 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/03_frequency_sweep.py @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Frequency Sweep Example + +Demonstrates smooth frequency transitions (glide/portamento effect) +by sweeping through different frequency ranges. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator( + wave_type="sine", + glide=0.05, # 50ms glide for noticeable portamento +) + +App.start_brick(wave_gen) +wave_gen.set_amplitude(0.7) + + +def frequency_sweep(): + """Sweep through frequency ranges.""" + + # Low to high sweep + print("Sweeping low to high (220Hz → 880Hz)...") + for freq in range(220, 881, 20): + wave_gen.set_frequency(float(freq)) + time.sleep(0.1) + + time.sleep(0.5) + + # High to low sweep + print("Sweeping high to low (880Hz → 220Hz)...") + for freq in range(880, 219, -20): + wave_gen.set_frequency(float(freq)) + time.sleep(0.1) + + # Fade out + print("Fading out...") + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Frequency sweep demonstration") +print("Listen for smooth glide between frequencies") +print("Press Ctrl+C to stop") + +App.run(user_loop=frequency_sweep) diff --git a/src/arduino/app_bricks/wave_generator/examples/04_musical_scale.py b/src/arduino/app_bricks/wave_generator/examples/04_musical_scale.py new file mode 100644 index 00000000..565cf67d --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/04_musical_scale.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Musical Scale Example + +Plays a musical scale (C major) demonstrating discrete note transitions +with smooth glide between notes. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +# Musical note frequencies (C major scale) +NOTES = { + "C4": 261.63, + "D4": 293.66, + "E4": 329.63, + "F4": 349.23, + "G4": 392.00, + "A4": 440.00, + "B4": 493.88, + "C5": 523.25, +} + +wave_gen = WaveGenerator( + wave_type="triangle", # Soft triangle wave + glide=0.03, # 30ms glide between notes + attack=0.01, + release=0.05, +) + +App.start_brick(wave_gen) +wave_gen.set_volume(70) + + +def play_scale(): + """Play C major scale up and down.""" + scale = ["C4", "D4", "E4", "F4", "G4", "A4", "B4", "C5"] + + # Ascending + print("Playing ascending scale...") + for note in scale: + print(f" {note}: {NOTES[note]:.2f} Hz") + wave_gen.set_frequency(NOTES[note]) + wave_gen.set_amplitude(0.7) + time.sleep(0.5) + + time.sleep(0.3) + + # Descending + print("Playing descending scale...") + for note in reversed(scale): + print(f" {note}: {NOTES[note]:.2f} Hz") + wave_gen.set_frequency(NOTES[note]) + wave_gen.set_amplitude(0.7) + time.sleep(0.5) + + # Fade out + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Musical Scale Demo - C Major") +print("Press Ctrl+C to stop") + +App.run(user_loop=play_scale) diff --git a/src/arduino/app_bricks/wave_generator/examples/05_envelope_control.py b/src/arduino/app_bricks/wave_generator/examples/05_envelope_control.py new file mode 100644 index 00000000..5ae54f53 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/05_envelope_control.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Envelope Control Example + +Demonstrates amplitude envelope control with different +attack and release times for various sonic effects. +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_utils import App + +wave_gen = WaveGenerator(wave_type="sine") +App.start_brick(wave_gen) + +wave_gen.set_frequency(440.0) +wave_gen.set_volume(80) + + +def envelope_demo(): + """Demonstrate different envelope settings.""" + + # Fast attack, fast release (percussive) + print("1. Percussive (fast attack/release)...") + wave_gen.set_envelope_params(attack=0.001, release=0.01, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(0.5) + wave_gen.set_amplitude(0.0) + time.sleep(1) + + # Slow attack, fast release (pad-like) + print("2. Pad-like (slow attack, fast release)...") + wave_gen.set_envelope_params(attack=0.2, release=0.05, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(1) + wave_gen.set_amplitude(0.0) + time.sleep(1) + + # Fast attack, slow release (sustained) + print("3. Sustained (fast attack, slow release)...") + wave_gen.set_envelope_params(attack=0.01, release=0.3, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(0.5) + wave_gen.set_amplitude(0.0) + time.sleep(1.5) + + # Medium attack and release (balanced) + print("4. Balanced (medium attack/release)...") + wave_gen.set_envelope_params(attack=0.05, release=0.05, glide=0.0) + wave_gen.set_amplitude(0.8) + time.sleep(0.8) + wave_gen.set_amplitude(0.0) + time.sleep(2) + + +print("Envelope Control Demonstration") +print("Listen to different attack/release characteristics") +print("Press Ctrl+C to stop") + +App.run(user_loop=envelope_demo) diff --git a/src/arduino/app_bricks/wave_generator/examples/06_external_speaker.py b/src/arduino/app_bricks/wave_generator/examples/06_external_speaker.py new file mode 100644 index 00000000..db302fe2 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/examples/06_external_speaker.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +""" +Custom Speaker Configuration Example + +Demonstrates how to use a pre-configured Speaker instance with WaveGenerator. +Use this approach when you need: +- Specific USB speaker selection (USB_SPEAKER_2, etc.) +- Different audio format (S16_LE, etc.) +- Explicit device name ("plughw:CARD=Device,DEV=0") +""" + +import time +from arduino.app_bricks.wave_generator import WaveGenerator +from arduino.app_peripherals.speaker import Speaker +from arduino.app_utils import App + +# List available USB speakers +available_speakers = Speaker.list_usb_devices() +print(f"Available USB speakers: {available_speakers}") + +# Create and configure a Speaker with specific parameters +speaker = Speaker( + device=Speaker.USB_SPEAKER_1, # or None for auto-detect, or specific device + sample_rate=16000, + channels=1, + format="FLOAT_LE", +) + +# Create WaveGenerator with the external speaker +# WaveGenerator will manage the speaker's lifecycle (start/stop) +wave_gen = WaveGenerator( + sample_rate=16000, + speaker=speaker, # Pass pre-configured speaker + wave_type="sine", + glide=0.02, +) + +# Start the WaveGenerator (which will also start the speaker) +App.start_brick(wave_gen) + + +def play_sequence(): + """Play a simple frequency sequence.""" + frequencies = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25] # C4 to C5 + note_names = ["C4", "D4", "E4", "F4", "G4", "A4", "B4", "C5"] + + for freq, name in zip(frequencies, note_names): + print(f"Playing {name} ({freq:.2f} Hz)") + wave_gen.set_frequency(freq) + wave_gen.set_amplitude(0.7) + time.sleep(0.5) + + # Fade out + wave_gen.set_amplitude(0.0) + time.sleep(1) + + +print("Playing musical scale with external speaker...") +print("Press Ctrl+C to stop") + +App.run(user_loop=play_sequence) + +# WaveGenerator automatically stops the speaker when it stops +print("Done") diff --git a/src/arduino/app_bricks/wave_generator/wave_generator.py b/src/arduino/app_bricks/wave_generator/wave_generator.py new file mode 100644 index 00000000..47b6fd41 --- /dev/null +++ b/src/arduino/app_bricks/wave_generator/wave_generator.py @@ -0,0 +1,390 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import logging +import math +import threading +import time +import numpy as np +from typing import Literal +from arduino.app_utils import Logger, brick +from arduino.app_peripherals.speaker import Speaker + +logger = Logger("WaveGenerator", logging.INFO) + + +WaveType = Literal["sine", "square", "sawtooth", "triangle"] + + +@brick +class WaveGenerator: + """Continuous wave generator brick for audio synthesis. + + This brick generates continuous audio waveforms (sine, square, sawtooth, triangle) + and streams them to a USB speaker in real-time. It provides smooth transitions + between frequency and amplitude changes using configurable envelope parameters. + + The generator runs continuously in a background thread, producing audio blocks + at a steady rate with minimal latency. + + Attributes: + sample_rate (int): Audio sample rate in Hz (default: 16000). + wave_type (WaveType): Type of waveform to generate. + frequency (float): Current output frequency in Hz. + amplitude (float): Current output amplitude (0.0-1.0). + """ + + def __init__( + self, + sample_rate: int = 16000, + wave_type: WaveType = "sine", + block_duration: float = 0.03, + attack: float = 0.01, + release: float = 0.03, + glide: float = 0.02, + speaker: Speaker = None, + ): + """Initialize the WaveGenerator brick. + + Args: + sample_rate (int): Audio sample rate in Hz (default: 16000). + wave_type (WaveType): Initial waveform type (default: "sine"). + block_duration (float): Duration of each audio block in seconds (default: 0.03). + attack (float): Attack time for amplitude envelope in seconds (default: 0.01). + release (float): Release time for amplitude envelope in seconds (default: 0.03). + glide (float): Frequency glide time (portamento) in seconds (default: 0.02). + speaker (Speaker, optional): Pre-configured Speaker instance. If None, a new Speaker + will be created with default settings (auto-detect device, FLOAT_LE format). + WaveGenerator will manage the speaker's lifecycle (calling start/stop). + + Raises: + SpeakerException: If no USB speaker is found or device is busy. + """ + self.sample_rate = int(sample_rate) + self.block_duration = float(block_duration) + self.wave_type = wave_type + + # Envelope parameters + self.attack = float(attack) + self.release = float(release) + self.glide = float(glide) + + # Target state (updated by user) + self._target_freq = 440.0 + self._target_amp = 0.0 + + # Current state (internal, smoothed) + self._current_freq = 440.0 + self._current_amp = 0.0 + self._phase = 0.0 + + # Pre-allocated buffers + self._buf_N = 0 + self._buf_phase_incs = None + self._buf_phases = None + self._buf_envelope = None + self._buf_samples = None + + # Speaker setup + if speaker is not None: + # Use externally provided Speaker instance + self._speaker = speaker + logger.info("Using externally provided Speaker instance") + else: + # Create internal Speaker instance with default settings + self._speaker = Speaker( + device=None, # Auto-detect first available USB speaker + sample_rate=sample_rate, + channels=1, + format="FLOAT_LE", + ) + logger.info( + "Created internal Speaker: device=auto-detect, sample_rate=%d, format=FLOAT_LE", + sample_rate, + ) + + # Producer thread control + self._running = threading.Event() + self._producer_thread = None + self._state_lock = threading.Lock() + + logger.info( + "WaveGenerator initialized: sample_rate=%d, wave_type=%s, block_dur=%.3fs", + sample_rate, + wave_type, + block_duration, + ) + + def start(self): + """Start the wave generator and audio output. + + This starts the speaker device and launches the producer thread that + continuously generates and streams audio blocks. + """ + if self._running.is_set(): + logger.warning("WaveGenerator is already running") + return + + logger.info("Starting WaveGenerator...") + self._speaker.start() + + # Set hardware speaker volume to maximum (100%) + try: + self._speaker.set_volume(100) + logger.info("Speaker hardware volume set to 100%") + except Exception as e: + logger.warning(f"Could not set speaker volume: {e}") + + self._running.set() + + self._producer_thread = threading.Thread(target=self._producer_loop, daemon=True, name="WaveGenerator-Producer") + self._producer_thread.start() + + logger.info("WaveGenerator started") + + def stop(self): + """Stop the wave generator and audio output. + + This stops the producer thread and closes the speaker device. + """ + if not self._running.is_set(): + logger.warning("WaveGenerator is not running") + return + + logger.info("Stopping WaveGenerator...") + self._running.clear() + + if self._producer_thread: + self._producer_thread.join(timeout=5) + if self._producer_thread.is_alive(): + logger.warning("Producer thread did not terminate in time") + self._producer_thread = None + + self._speaker.stop() + logger.info("WaveGenerator stopped") + + def set_frequency(self, frequency: float): + """Set the target output frequency. + + The frequency will smoothly transition to the new value over the + configured glide time. + + Args: + frequency (float): Target frequency in Hz (typically 20-8000 Hz). + """ + with self._state_lock: + self._target_freq = float(max(0.0, frequency)) + + def set_amplitude(self, amplitude: float): + """Set the target output amplitude. + + The amplitude will smoothly transition to the new value over the + configured attack/release time. + + Args: + amplitude (float): Target amplitude in range [0.0, 1.0]. + """ + with self._state_lock: + self._target_amp = float(max(0.0, min(1.0, amplitude))) + + def set_wave_type(self, wave_type: WaveType): + """Change the waveform type. + + Args: + wave_type (WaveType): One of "sine", "square", "sawtooth", "triangle". + + Raises: + ValueError: If wave_type is not valid. + """ + valid_types = ["sine", "square", "sawtooth", "triangle"] + if wave_type not in valid_types: + raise ValueError(f"Invalid wave_type '{wave_type}'. Must be one of {valid_types}") + + with self._state_lock: + self.wave_type = wave_type + logger.info(f"Wave type changed to: {wave_type}") + + def set_volume(self, volume: int): + """Set the speaker volume level. + + This is a wrapper that controls the hardware volume of the USB speaker device. + + Args: + volume (int): Hardware volume level (0-100). + + Raises: + SpeakerException: If the mixer is not available or if volume cannot be set. + """ + self._speaker.set_volume(volume) + logger.info(f"Speaker volume set to {volume}%") + + def get_volume(self) -> int: + """Get the current speaker volume level. + + Returns: + int: Current hardware volume level (0-100). + """ + try: + return self._speaker._mixer.getvolume()[0] if self._speaker._mixer else 100 + except Exception: + return 100 + + def set_envelope_params(self, attack: float = None, release: float = None, glide: float = None): + """Update envelope parameters. + + Args: + attack (float, optional): Attack time in seconds. + release (float, optional): Release time in seconds. + glide (float, optional): Frequency glide time in seconds. + """ + with self._state_lock: + if attack is not None: + self.attack = float(max(0.0, attack)) + if release is not None: + self.release = float(max(0.0, release)) + if glide is not None: + self.glide = float(max(0.0, glide)) + + def get_state(self) -> dict: + """Get current generator state. + + Returns: + dict: Dictionary containing current frequency, amplitude, wave type, etc. + """ + with self._state_lock: + return { + "frequency": self._current_freq, + "amplitude": self._current_amp, + "wave_type": self.wave_type, + "volume": self.get_volume(), + "phase": self._phase, + } + + def _producer_loop(self): + """Main producer loop running in background thread. + + Continuously generates audio blocks at a steady cadence and streams + them to the speaker device. + """ + logger.debug("Producer loop started") + next_time = time.perf_counter() + block_count = 0 + + while self._running.is_set(): + next_time += self.block_duration + + # Read target state + with self._state_lock: + target_freq = self._target_freq + target_amp = self._target_amp + wave_type = self.wave_type + + # Log every 100 blocks or when amplitude changes + block_count += 1 + if block_count % 100 == 0 or (block_count < 5): + logger.debug(f"Producer: block={block_count}, freq={target_freq:.1f}Hz, amp={target_amp:.3f}") + + # Generate audio block + try: + audio_block = self._generate_block(target_freq, target_amp, wave_type) + self._speaker.play(audio_block, block_on_queue=False) + except Exception as e: + logger.error(f"Error generating audio block: {e}") + + # Wait until next scheduled time + now = time.perf_counter() + sleep_time = next_time - now + if sleep_time > 0: + time.sleep(sleep_time) + else: + # We're falling behind, reset timing + next_time = now + + logger.debug("Producer loop terminated") + + def _generate_block(self, freq_target: float, amp_target: float, wave_type: str) -> np.ndarray: + """Generate a single audio block. + + Args: + freq_target (float): Target frequency in Hz. + amp_target (float): Target amplitude (0.0-1.0). + wave_type (str): Waveform type. + + Returns: + np.ndarray: Audio samples as float32 array. + """ + N = max(1, int(self.sample_rate * self.block_duration)) + + # Ensure buffers are allocated + if N > self._buf_N: + self._buf_N = N + self._buf_phase_incs = np.empty(self._buf_N, dtype=np.float32) + self._buf_phases = np.empty(self._buf_N, dtype=np.float32) + self._buf_envelope = np.empty(self._buf_N, dtype=np.float32) + self._buf_samples = np.empty(self._buf_N, dtype=np.float32) + + phases = self._buf_phases[:N] + envelope = self._buf_envelope[:N] + samples = self._buf_samples[:N] + + # === AMPLITUDE SMOOTHING === + amp_current = self._current_amp + if amp_target == amp_current or (self.attack <= 0.0 and self.release <= 0.0): + envelope.fill(amp_target) + else: + ramp = self.attack if amp_target > amp_current else self.release + if ramp <= 0.0: + envelope.fill(amp_target) + else: + frac = min(1.0, self.block_duration / ramp) + next_amp = amp_current + (amp_target - amp_current) * frac + envelope[:] = np.linspace(amp_current, next_amp, N, dtype=np.float32) + amp_current = float(envelope[-1]) + + # === FREQUENCY GLIDE (PORTAMENTO) === + freq_current = self._current_freq + phase_incs = self._buf_phase_incs[:N] + + if self.glide > 0.0 and freq_current != freq_target: + # Apply glide smoothing over time + frac = min(1.0, self.block_duration / self.glide) + next_freq = freq_current + (freq_target - freq_current) * frac + + # Linear interpolation within block + freq_ramp = np.linspace(freq_current, next_freq, N, dtype=np.float32) + phase_incs[:] = 2.0 * math.pi * freq_ramp / float(self.sample_rate) + + freq_current = float(next_freq) + else: + # No glide or already at target + phase_incr = 2.0 * math.pi * freq_target / float(self.sample_rate) + phase_incs.fill(phase_incr) + freq_current = freq_target + + # === PHASE ACCUMULATION === + np.cumsum(phase_incs, dtype=np.float32, out=phases) + phases += self._phase + self._phase = float(phases[-1] % (2.0 * math.pi)) + + # === WAVEFORM GENERATION === + if wave_type == "sine": + np.sin(phases, out=samples) + elif wave_type == "square": + samples[:] = np.where(np.sin(phases) >= 0, 1.0, -1.0) + elif wave_type == "sawtooth": + samples[:] = 2.0 * (phases / (2.0 * math.pi) % 1.0) - 1.0 + elif wave_type == "triangle": + samples[:] = 2.0 * np.abs(2.0 * (phases / (2.0 * math.pi) % 1.0) - 1.0) - 1.0 + else: + # Fallback to sine + np.sin(phases, out=samples) + + # === APPLY ENVELOPE AND GAIN === + np.multiply(samples, envelope, out=samples) + + # Update internal state + self._current_amp = amp_current + self._current_freq = freq_current + + return samples diff --git a/src/arduino/app_peripherals/speaker/__init__.py b/src/arduino/app_peripherals/speaker/__init__.py index 881145e1..f3afa62c 100644 --- a/src/arduino/app_peripherals/speaker/__init__.py +++ b/src/arduino/app_peripherals/speaker/__init__.py @@ -227,27 +227,24 @@ def _open_pcm(self): raise SpeakerException(f"Unexpected error opening spaker: {e}") def _load_mixer(self) -> alsaaudio.Mixer: + """Load the Headset mixer for volume control.""" try: cards = alsaaudio.cards() card_indexes = alsaaudio.card_indexes() for card_name, card_index in zip(cards, card_indexes): - logger.debug(f"Checking Card {card_name} (index {card_index}, device {self.device})") if f"CARD={card_name}," in self.device: try: - mixer = alsaaudio.mixers(cardindex=card_index) - if len(mixer) == 0: - logger.warning(f"No mixers found for card {card_name}.") - continue - mx = alsaaudio.Mixer(mixer[0]) - logger.debug(f"Loaded mixer: {mixer[0]} for card {card_name}") - return mx + mx = alsaaudio.Mixer("Headset", cardindex=card_index) + if mx.volumecap(): + logger.info(f"Loaded Headset mixer for card {card_name}") + return mx except alsaaudio.ALSAAudioError as e: - logger.debug(f"Failed to load mixer for card {card_name}: {e}") + logger.debug(f"Failed to load Headset mixer for card {card_name}: {e}") + return None - # No suitable mixer found, return None return None except alsaaudio.ALSAAudioError as e: - logger.warning(f"Error loading mixer {self.device}: {e}") + logger.warning(f"Error loading mixer for {self.device}: {e}") return None def get_volume(self) -> int: diff --git a/tests/arduino/app_bricks/wave_generator/test_wave_generator.py b/tests/arduino/app_bricks/wave_generator/test_wave_generator.py new file mode 100644 index 00000000..7deacae0 --- /dev/null +++ b/tests/arduino/app_bricks/wave_generator/test_wave_generator.py @@ -0,0 +1,485 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import pytest +import numpy as np +import threading +import time +from arduino.app_bricks.wave_generator import WaveGenerator +import arduino.app_utils.app as app +from arduino.app_utils import AppController + + +@pytest.fixture +def app_instance(monkeypatch): + """Provides a fresh AppController instance for each test.""" + instance = AppController() + monkeypatch.setattr(app, "App", instance) + return instance + + +@pytest.fixture(autouse=True) +def mock_speaker(monkeypatch): + """Mock Speaker to avoid hardware dependencies.""" + + class FakeSpeaker: + def __init__(self, device=None, sample_rate=16000, channels=1, format="FLOAT_LE"): + self.device = device or "fake_device" + self.sample_rate = sample_rate + self.channels = channels + self.format = format + self._is_started = False + self._played_data = [] + self._mixer = FakeMixer() + + def start(self): + self._is_started = True + + def stop(self): + self._is_started = False + + def play(self, data, block_on_queue=False): + if self._is_started: + self._played_data.append(data) + + def set_volume(self, volume: int): + self._mixer.setvolume(volume) + + def is_started(self): + return self._is_started + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + return False + + class FakeMixer: + def __init__(self): + self._volume = 100 + + def setvolume(self, volume: int): + self._volume = max(0, min(100, volume)) + + def getvolume(self): + return [self._volume] + + # Patch Speaker in the wave_generator module + monkeypatch.setattr("arduino.app_bricks.wave_generator.wave_generator.Speaker", FakeSpeaker) + return FakeSpeaker + + +def test_wave_generator_initialization_default(mock_speaker): + """Test WaveGenerator initializes with default parameters.""" + wave_gen = WaveGenerator() + + assert wave_gen.sample_rate == 16000 + assert wave_gen.wave_type == "sine" + assert wave_gen.block_duration == 0.03 + assert wave_gen.attack == 0.01 + assert wave_gen.release == 0.03 + assert wave_gen.glide == 0.02 + assert wave_gen._speaker is not None + assert wave_gen._speaker.sample_rate == 16000 + + +def test_wave_generator_initialization_custom(mock_speaker): + """Test WaveGenerator initializes with custom parameters.""" + wave_gen = WaveGenerator( + sample_rate=48000, + wave_type="square", + block_duration=0.05, + attack=0.02, + release=0.05, + glide=0.03, + ) + + assert wave_gen.sample_rate == 48000 + assert wave_gen.wave_type == "square" + assert wave_gen.block_duration == 0.05 + assert wave_gen.attack == 0.02 + assert wave_gen.release == 0.05 + assert wave_gen.glide == 0.03 + assert wave_gen._speaker.sample_rate == 48000 + + +def test_wave_generator_with_external_speaker(mock_speaker): + """Test WaveGenerator with externally provided Speaker.""" + external_speaker = mock_speaker(device="external_device", sample_rate=16000) + wave_gen = WaveGenerator(speaker=external_speaker) + + assert wave_gen._speaker is external_speaker + assert wave_gen._speaker.device == "external_device" + + +def test_wave_generator_start_stop(app_instance, mock_speaker): + """Test WaveGenerator start and stop methods.""" + wave_gen = WaveGenerator() + + # Initially not running + assert not wave_gen._running.is_set() + + # Start the generator + wave_gen.start() + assert wave_gen._running.is_set() + assert wave_gen._speaker.is_started() + assert wave_gen._producer_thread is not None + assert wave_gen._producer_thread.is_alive() + + time.sleep(0.1) # Let it run briefly + + # Stop the generator + wave_gen.stop() + assert not wave_gen._running.is_set() + assert not wave_gen._speaker.is_started() + + # Wait for thread to finish + time.sleep(0.1) + + +def test_set_frequency(mock_speaker): + """Test setting frequency.""" + wave_gen = WaveGenerator() + + wave_gen.set_frequency(440.0) + assert wave_gen._target_freq == 440.0 + + wave_gen.set_frequency(880.0) + assert wave_gen._target_freq == 880.0 + + # Test negative frequency (should be clamped to 0) + wave_gen.set_frequency(-100.0) + assert wave_gen._target_freq == 0.0 + + +def test_set_amplitude(mock_speaker): + """Test setting amplitude.""" + wave_gen = WaveGenerator() + + wave_gen.set_amplitude(0.5) + assert wave_gen._target_amp == 0.5 + + wave_gen.set_amplitude(1.0) + assert wave_gen._target_amp == 1.0 + + # Test out of range (should be clamped) + wave_gen.set_amplitude(1.5) + assert wave_gen._target_amp == 1.0 + + wave_gen.set_amplitude(-0.5) + assert wave_gen._target_amp == 0.0 + + +def test_set_wave_type(mock_speaker): + """Test setting wave type.""" + wave_gen = WaveGenerator() + + wave_gen.set_wave_type("sine") + assert wave_gen.wave_type == "sine" + + wave_gen.set_wave_type("square") + assert wave_gen.wave_type == "square" + + wave_gen.set_wave_type("sawtooth") + assert wave_gen.wave_type == "sawtooth" + + wave_gen.set_wave_type("triangle") + assert wave_gen.wave_type == "triangle" + + # Test invalid wave type + with pytest.raises(ValueError): + wave_gen.set_wave_type("invalid") + + +def test_set_volume(mock_speaker): + """Test setting hardware volume.""" + wave_gen = WaveGenerator() + + wave_gen.set_volume(70) + assert wave_gen._speaker._mixer._volume == 70 + + wave_gen.set_volume(100) + assert wave_gen._speaker._mixer._volume == 100 + + # Test get_volume + assert wave_gen.get_volume() == 100 + + wave_gen.set_volume(50) + assert wave_gen.get_volume() == 50 + + +def test_set_envelope_params(mock_speaker): + """Test setting envelope parameters.""" + wave_gen = WaveGenerator() + + wave_gen.set_envelope_params(attack=0.05) + assert wave_gen.attack == 0.05 + + wave_gen.set_envelope_params(release=0.1) + assert wave_gen.release == 0.1 + + wave_gen.set_envelope_params(glide=0.04) + assert wave_gen.glide == 0.04 + + # Test all at once + wave_gen.set_envelope_params(attack=0.02, release=0.06, glide=0.03) + assert wave_gen.attack == 0.02 + assert wave_gen.release == 0.06 + assert wave_gen.glide == 0.03 + + # Test negative values (should be clamped to 0) + wave_gen.set_envelope_params(attack=-0.01) + assert wave_gen.attack == 0.0 + + +def test_get_state(mock_speaker): + """Test getting current generator state.""" + wave_gen = WaveGenerator() + + wave_gen.set_frequency(440.0) + wave_gen.set_amplitude(0.8) + wave_gen.set_wave_type("square") + wave_gen.set_volume(90) + + state = wave_gen.get_state() + + assert "frequency" in state + assert "amplitude" in state + assert "wave_type" in state + assert state["wave_type"] == "square" + assert "volume" in state + assert state["volume"] == 90 + assert "phase" in state + + +def test_generate_block_sine(mock_speaker): + """Test generating a sine wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + # Generate a block + block = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Check block properties + assert isinstance(block, np.ndarray) + assert block.dtype == np.float32 + expected_samples = int(16000 * 0.03) # block_duration = 0.03 + assert len(block) == expected_samples + # Check amplitude is within range + assert np.max(np.abs(block)) <= 0.5 + + +def test_generate_block_square(mock_speaker): + """Test generating a square wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + block = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="square") + + assert isinstance(block, np.ndarray) + # Square wave has envelope applied, so check amplitude range + assert np.max(np.abs(block)) <= 0.5 + + +def test_generate_block_sawtooth(mock_speaker): + """Test generating a sawtooth wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sawtooth") + + # Verify internal state updated correctly + assert wave_gen._buf_samples is not None + + +def test_generate_block_triangle(mock_speaker): + """Test generating a triangle wave block.""" + wave_gen = WaveGenerator(sample_rate=16000) + + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="triangle") + + # Verify internal state updated correctly + assert wave_gen._buf_samples is not None + + +def test_frequency_glide(mock_speaker): + """Test frequency glide (portamento) effect.""" + wave_gen = WaveGenerator(sample_rate=16000, glide=0.1) + + # Set initial frequency + wave_gen._current_freq = 220.0 + + # Generate block with new target frequency + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Current frequency should have moved towards target but not reached it + # (because glide time is longer than block duration) + assert wave_gen._current_freq > 220.0 + assert wave_gen._current_freq < 440.0 + + +def test_amplitude_envelope(mock_speaker): + """Test amplitude envelope (attack/release).""" + wave_gen = WaveGenerator(sample_rate=16000, attack=0.1, release=0.1) + + # Set initial amplitude + wave_gen._current_amp = 0.0 + + # Generate block with new target amplitude + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.8, wave_type="sine") + + # Current amplitude should have moved towards target but not reached it + assert wave_gen._current_amp > 0.0 + assert wave_gen._current_amp < 0.8 + + +def test_producer_loop_generates_audio(app_instance, mock_speaker): + """Test that producer loop generates and plays audio.""" + wave_gen = WaveGenerator() + + wave_gen.set_frequency(440.0) + wave_gen.set_amplitude(0.5) + wave_gen.start() + + # Let it run for a bit + time.sleep(0.2) + + # Check that audio was played + assert len(wave_gen._speaker._played_data) > 0 + + wave_gen.stop() + + +def test_thread_safety(mock_speaker): + """Test thread-safe access to parameters.""" + wave_gen = WaveGenerator() + + def set_params(): + for i in range(100): + wave_gen.set_frequency(440.0 + i) + wave_gen.set_amplitude(0.5) + time.sleep(0.001) + + def get_state(): + for i in range(100): + state = wave_gen.get_state() + assert "frequency" in state + time.sleep(0.001) + + wave_gen.start() + + # Start multiple threads accessing the generator + threads = [ + threading.Thread(target=set_params), + threading.Thread(target=get_state), + ] + + for t in threads: + t.start() + + for t in threads: + t.join(timeout=5) + + wave_gen.stop() + + +def test_buffer_preallocation(mock_speaker): + """Test that buffers are pre-allocated and reused.""" + wave_gen = WaveGenerator(sample_rate=16000) + + # Generate first block + block1 = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Check buffers are allocated + assert wave_gen._buf_N > 0 + assert wave_gen._buf_phase_incs is not None + assert wave_gen._buf_phases is not None + assert wave_gen._buf_envelope is not None + assert wave_gen._buf_samples is not None + + # Generate second block + _ = wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Buffers should still be the same size (reused) + assert wave_gen._buf_N == len(block1) + + +def test_phase_continuity(mock_speaker): + """Test that phase is continuous across blocks.""" + wave_gen = WaveGenerator(sample_rate=16000) + + initial_phase = wave_gen._phase + + # Generate multiple blocks + for _ in range(10): + wave_gen._generate_block(freq_target=440.0, amp_target=0.5, wave_type="sine") + + # Phase should have advanced + assert wave_gen._phase != initial_phase + # Phase should be wrapped to [0, 2π] + assert 0.0 <= wave_gen._phase < 2 * np.pi + + +def test_zero_amplitude_produces_silence(mock_speaker): + """Test that zero amplitude produces silent output.""" + wave_gen = WaveGenerator(sample_rate=16000) + + block = wave_gen._generate_block(freq_target=440.0, amp_target=0.0, wave_type="sine") + + # All samples should be zero or very close to zero + assert np.allclose(block, 0.0, atol=1e-6) + + +def test_app_controller_integration(app_instance, mock_speaker): + """Test integration with AppController (start/stop via App).""" + wave_gen = WaveGenerator() + + # Register manually to avoid auto-registration + app_instance.unregister(wave_gen) + app_instance.start_brick(wave_gen) + + assert wave_gen._running.is_set() + assert wave_gen._speaker.is_started() + + time.sleep(0.1) + + app_instance.stop_brick(wave_gen) + + assert not wave_gen._running.is_set() + assert not wave_gen._speaker.is_started() + + +def test_multiple_start_stop_cycles(app_instance, mock_speaker): + """Test starting and stopping multiple times.""" + wave_gen = WaveGenerator() + + for _ in range(3): + wave_gen.start() + assert wave_gen._running.is_set() + time.sleep(0.05) + + wave_gen.stop() + assert not wave_gen._running.is_set() + time.sleep(0.05) + + +def test_double_start_warning(app_instance, mock_speaker): + """Test that starting an already running generator logs a warning.""" + wave_gen = WaveGenerator() + + wave_gen.start() + assert wave_gen._running.is_set() + + # Try to start again (should warn but not crash) + wave_gen.start() + assert wave_gen._running.is_set() + + wave_gen.stop() + + +def test_double_stop_warning(app_instance, mock_speaker): + """Test that stopping a non-running generator logs a warning.""" + wave_gen = WaveGenerator() + + # Try to stop before starting (should warn but not crash) + wave_gen.stop() + assert not wave_gen._running.is_set()