From bf46a45f898b4cd75be698475048196812535060 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Mon, 1 Sep 2025 19:32:57 -0700 Subject: [PATCH] Messing with audio --- voicebot/synthetic_media.py | 288 ++++++++++++++++++++++-------------- 1 file changed, 173 insertions(+), 115 deletions(-) diff --git a/voicebot/synthetic_media.py b/voicebot/synthetic_media.py index e6c938e..c157084 100644 --- a/voicebot/synthetic_media.py +++ b/voicebot/synthetic_media.py @@ -6,17 +6,36 @@ Contains AnimatedVideoTrack and SyntheticAudioTrack implementations ported from """ import numpy as np +import math import cv2 import fractions import time import random -from typing import TypedDict +from av.audio.frame import AudioFrame +from asyncio import Queue, create_task, sleep +from typing import TypedDict, TYPE_CHECKING from aiortc import MediaStreamTrack -from av import VideoFrame, AudioFrame +from av import VideoFrame from logger import logger +if TYPE_CHECKING: + pass + +# Shared clock +from time import perf_counter + + +class MediaClock: + def __init__(self): + self.t0 = perf_counter() + + def now(self) -> float: + return perf_counter() - self.t0 + + class BounceEvent(TypedDict): """Type definition for bounce events""" + type: str start_sample: int end_sample: int @@ -35,6 +54,7 @@ class AnimatedVideoTrack(MediaStreamTrack): def __init__( self, + clock: MediaClock, width: int = 320, height: int = 240, name: str = "", @@ -44,6 +64,10 @@ class AnimatedVideoTrack(MediaStreamTrack): self.width = width self.height = height self.name = name + self.clock = clock + self.fps = 15 + self._next_frame_index = 0 + self.audio_track = audio_track # Reference to the audio track self.remote_video_tracks: list[ MediaStreamTrack @@ -73,6 +97,10 @@ class AnimatedVideoTrack(MediaStreamTrack): self._start_time = time.time() self._last_frame_time = time.time() self.fps = 15 # Target frames per second + self._remote_latest = {} # track -> np.ndarray + self._remote_tasks: list[ + tuple[MediaStreamTrack, object, Queue[np.ndarray]] + ] = [] def set_ball_speed(self, speed_mps: float): """Set the ball speed in meters per second""" @@ -83,6 +111,19 @@ class AnimatedVideoTrack(MediaStreamTrack): if track.kind == "video": self.remote_video_tracks.append(track) logger.info(f"Added remote video track: {track}") + q: Queue[np.ndarray] = Queue(maxsize=1) + + async def pump(): + while True: + frame = await track.recv() + if isinstance(frame, VideoFrame): + img: np.ndarray = frame.to_ndarray(format="bgr24") + if q.full(): + _ = q.get_nowait() + await q.put(img) + + t = create_task(pump()) + self._remote_tasks.append((track, t, q)) def remove_remote_video_track(self, track: MediaStreamTrack): """Remove a remote video track""" @@ -90,36 +131,16 @@ class AnimatedVideoTrack(MediaStreamTrack): self.remote_video_tracks.remove(track) logger.info(f"Removed remote video track: {track}") - def _calculate_velocity_components(self) -> tuple[float, float]: - """ - Calculate dx and dy velocity components based on speed in meters per second. - Frame width represents 1 meter, so pixels per second = width * speed_mps - """ - # Calculate actual time delta since last frame - current_time = time.time() - dt = current_time - self._last_frame_time - self._last_frame_time = current_time - - # Normalize direction vector to ensure consistent speed - dir_x = self.ball["direction_x"] - dir_y = self.ball["direction_y"] - magnitude = np.sqrt(dir_x * dir_x + dir_y * dir_y) - - if magnitude > 0: - dir_x_norm = dir_x / magnitude - dir_y_norm = dir_y / magnitude - else: + def _calculate_velocity_components(self, dt: float) -> tuple[float, float]: + dir_x, dir_y = self.ball["direction_x"], self.ball["direction_y"] + mag = np.hypot(dir_x, dir_y) + if mag == 0: dir_x_norm, dir_y_norm = 1.0, 0.0 - - # Convert meters per second to pixels per actual time delta + else: + dir_x_norm, dir_y_norm = dir_x / mag, dir_y / mag pixels_per_second = self.width * self.ball["speed_mps"] pixels_this_frame = pixels_per_second * dt - - # Apply normalized direction to get velocity components - dx = pixels_this_frame * dir_x_norm - dy = pixels_this_frame * dir_y_norm - - return dx, dy + return pixels_this_frame * dir_x_norm, pixels_this_frame * dir_y_norm async def next_timestamp(self): """Returns (pts, time_base) for 15 FPS video""" @@ -171,42 +192,36 @@ class AnimatedVideoTrack(MediaStreamTrack): """Generate video frames at 15 FPS""" pts, time_base = await self.next_timestamp() + # Target timestamp for this frame (seconds since t0) + target_t = self._next_frame_index / self.fps + now = self.clock.now() + if target_t > now: + await sleep(target_t - now) + + # Use constant dt tied to fps (prevents physics jitter) + dt = 1.0 / self.fps + dx, dy = self._calculate_velocity_components(dt) + + # PTS derived from frame index, not wall clock + pts = int(self._next_frame_index * (90000 / self.fps)) + time_base = 1 / 90000 + + self._next_frame_index += 1 + # Create black background frame_array = np.zeros((self.height, self.width, 3), dtype=np.uint8) # Process remote video tracks with edge detection - for track in self.remote_video_tracks: + for _track, _task, q in self._remote_tasks: try: - # Get the latest frame from the remote track (non-blocking) - remote_frame = await track.recv() - if remote_frame and isinstance(remote_frame, VideoFrame): - # Convert to numpy array - img: np.ndarray = remote_frame.to_ndarray(format="bgr24") - - # Apply edge detection - edges = cv2.Canny(img, 100, 200) - img_edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) - - # Resize to match our canvas size if needed - if img_edges.shape[:2] != (self.height, self.width): - img_edges = cv2.resize(img_edges, (self.width, self.height)) - - # Blend with existing frame (additive blend for edge detection overlay) - frame_array = cv2.addWeighted( - frame_array.astype(np.uint8), - 0.7, - img_edges.astype(np.uint8), - 0.3, - 0, - ) - - except Exception as e: - # If we can't get a frame from this track, continue with others - logger.debug(f"Could not get frame from remote track: {e}") + img: np.ndarray = q.get_nowait() + except Exception: continue - - # Calculate velocity components based on current speed - dx, dy = self._calculate_velocity_components() + edges = cv2.Canny(img, 100, 200) + img_edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) + if img_edges.shape[:2] != (self.height, self.width): + img_edges = cv2.resize(img_edges, (self.width, self.height)) + frame_array = cv2.addWeighted(frame_array, 0.7, img_edges, 0.3, 0.0) # Update ball position ball = self.ball @@ -225,7 +240,7 @@ class AnimatedVideoTrack(MediaStreamTrack): # Trigger bounce sound if a bounce occurred if bounce_occurred and self.audio_track: logger.info("Video: Bounce detected, triggering audio event") - self.audio_track.add_bounce_event("bounce") + self.audio_track.add_bounce_event_at(self.clock.now()) # Keep ball in bounds ball["x"] = max(ball["radius"], min(self.width - ball["radius"], ball["x"])) @@ -272,31 +287,61 @@ class AnimatedVideoTrack(MediaStreamTrack): class SyntheticAudioTrack(MediaStreamTrack): + """ + Synthetic audio track that generates continuous tones based on ball position + and additional bounce sound effects. + + The frequency of the continuous tone is mapped to the ball's Y position: + - Top of screen (Y=0): 800Hz (high pitch) + - Bottom of screen (Y=height): 200Hz (low pitch) + + Bounce events add temporary audio effects on top of the continuous tone. + """ + kind = "audio" - def __init__(self): + def __init__( + self, clock: MediaClock, video_track: "AnimatedVideoTrack | None" = None + ): super().__init__() self.sample_rate = 48000 self.samples_per_frame = 960 self._samples_generated = 0 self._active_bounces: list[BounceEvent] = [] # List of active bounce events + self.video_track = video_track # Reference to video track for ball position + self.clock = clock - def add_bounce_event(self, bounce_type: str = "bounce"): - """Add a bounce event""" - bounce_duration_samples = int(0.2 * self.sample_rate) # 200ms - - # Add new bounce to the list (they can overlap) - bounce_event: BounceEvent = { - "start_sample": self._samples_generated, - "end_sample": self._samples_generated + bounce_duration_samples, - "type": bounce_type, - } - - self._active_bounces.append(bounce_event) - logger.info( - f"Bounce event added - start: {bounce_event['start_sample']}, end: {bounce_event['end_sample']}" + def add_bounce_event_at(self, bounce_time_s: float): + start_sample = int(bounce_time_s * self.sample_rate) + duration = int(0.2 * self.sample_rate) + self._active_bounces.append( + { + "type": "bounce", + "start_sample": start_sample, + "end_sample": start_sample + duration, + } ) + def _get_ball_frequency(self) -> float: + """Get the current frequency based on ball Y position""" + if not self.video_track: + return 440.0 # Default frequency if no video track + + # Map ball Y position to frequency range (200Hz to 800Hz) + ball_y = self.video_track.ball["y"] + height = self.video_track.height + + # Normalize Y position (0.0 at top, 1.0 at bottom) + normalized_y = ball_y / height + + # Map to frequency range (higher pitch for higher position, lower for lower) + # Invert so top = high frequency, bottom = low frequency + freq_min = 200.0 + freq_max = 400.0 + frequency = freq_max - (normalized_y * (freq_max - freq_min)) + + return frequency + def _generate_bounce_sample(self, t: float) -> float: """Generate a single bounce sample at time t""" if t < 0 or t > 0.2: @@ -318,52 +363,58 @@ class SyntheticAudioTrack(MediaStreamTrack): async def recv(self): pts, time_base = await self.next_timestamp() - samples = np.zeros((self.samples_per_frame,), dtype=np.float32) - # Generate samples for this frame - active_bounce_count = 0 - for i in range(self.samples_per_frame): - current_sample = self._samples_generated + i - sample_value = 0.0 + # --- 1. Generate base tone based on ball Y position --- + if self.video_track: + base_freq = self._get_ball_frequency() + else: + base_freq = 440.0 # default if no video track - # Check all active bounces for this sample - for bounce in self._active_bounces: - if bounce["start_sample"] <= current_sample < bounce["end_sample"]: - # Calculate time within this bounce - sample_offset = current_sample - bounce["start_sample"] - t = sample_offset / self.sample_rate + t = (np.arange(self.samples_per_frame) + pts) / self.sample_rate + samples = np.sin(2 * np.pi * base_freq * t).astype(np.float32) - # Add this bounce's contribution - sample_value += self._generate_bounce_sample(t) - active_bounce_count += 1 + # --- 2. Add bounce sound effect if triggered --- + if getattr(self, "just_bounced", False): + logger.info("Audio: Generating bounce sound effect") + tb = np.arange(self.samples_per_frame) / self.sample_rate + bounce_freq = 600.0 # Hz + bounce_env = np.exp(-tb * 20.0) # fast exponential decay + bounce_wave = 0.4 * np.sin(2 * np.pi * bounce_freq * tb) * bounce_env + samples = samples + bounce_wave.astype(np.float32) + self.just_bounced = False - samples[i] = sample_value + # --- 3. Stereo panning based on X position --- + if self.video_track: + pan = self.video_track.ball["x"] / self.video_track.width + else: + pan = 0.5 # center if no video + left_gain = math.cos(pan * math.pi / 2) + right_gain = math.sin(pan * math.pi / 2) - # Clean up expired bounces - self._active_bounces: list[BounceEvent] = [ - bounce - for bounce in self._active_bounces - if bounce["end_sample"] > self._samples_generated + self.samples_per_frame - ] + # --- 4. Volume scaling based on Y position --- + if self.video_track: + volume = (1.0 - (self.video_track.ball["y"] / self.video_track.height)) ** 2 + else: + volume = 1.0 - if active_bounce_count > 0: - logger.info( - f"Generated audio with {len(self._active_bounces)} active bounces" - ) + # --- 5. Apply gain and convert to int16 --- + left = (samples * left_gain * volume * 32767).astype(np.int16) + right = (samples * right_gain * volume * 32767).astype(np.int16) - self._samples_generated += self.samples_per_frame + # --- 6. Interleave channels for s16 format (samples arranged as [L, R, L, R, ...]) --- + # Create interleaved array: [left[0], right[0], left[1], right[1], ...] + interleaved = np.empty(self.samples_per_frame * 2, dtype=np.int16) + interleaved[0::2] = left # Even indices get left channel + interleaved[1::2] = right # Odd indices get right channel - # Convert to audio frame - samples = np.clip(samples, -1.0, 1.0) - samples_s16 = (samples * 32767).astype(np.int16) + # Reshape to (1, samples*2) as expected by s16 format + stereo = interleaved.reshape(1, -1) - frame = AudioFrame.from_ndarray( - samples_s16.reshape(1, -1), format="s16", layout="stereo" - ) + frame = AudioFrame.from_ndarray(stereo, format="s16", layout="stereo") frame.sample_rate = self.sample_rate frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) - + self._samples_generated += self.samples_per_frame return frame @@ -378,13 +429,20 @@ def create_synthetic_tracks(session_name: str) -> dict[str, MediaStreamTrack]: Dictionary containing 'video' and 'audio' tracks Note: - To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps) - where speed_in_mps is meters per second (frame width = 1 meter) + - To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps) + where speed_in_mps is meters per second (frame width = 1 meter) + - Audio generates continuous tone based on ball Y position (200-800Hz) + - Bounce events add additional audio on top of the continuous tone """ - # Create audio track first - audio_track = SyntheticAudioTrack() + media_clock = MediaClock() - # Create video track with reference to audio track for bounce events - video_track = AnimatedVideoTrack(name=session_name, audio_track=audio_track) + # Create video track first + video_track = AnimatedVideoTrack(name=session_name, clock=media_clock) + + # Create audio track with reference to video track for ball position-based frequency + audio_track = SyntheticAudioTrack(video_track=video_track, clock=media_clock) + + # Set the audio track reference on the video track for bounce events + video_track.audio_track = audio_track return {"video": video_track, "audio": audio_track}