""" Synthetic Media Tracks Module This module provides synthetic audio and video track creation for WebRTC media streaming. Contains AnimatedVideoTrack and SyntheticAudioTrack implementations ported from JavaScript. """ import numpy as np import math import cv2 import fractions import time import random from av.audio.frame import AudioFrame from asyncio import Queue, create_task, sleep from typing import TypedDict, TYPE_CHECKING, Tuple, List, Optional, Any import numpy.typing as npt from aiortc import MediaStreamTrack from av import VideoFrame from logger import logger if TYPE_CHECKING: pass # Shared clock from time import perf_counter class MediaClock: def __init__(self): self.t0 = perf_counter() def now(self) -> float: return perf_counter() - self.t0 class BounceEvent(TypedDict): """Type definition for bounce events""" type: str start_sample: int end_sample: int class AnimatedVideoTrack(MediaStreamTrack): """Animated synthetic video track (fixed, properly-indented implementation).""" kind = "video" def __init__( self, clock: MediaClock, width: int = 320, height: int = 240, name: str = "", audio_track: Optional["SyntheticAudioTrack"] = None, ) -> None: super().__init__() self.width = width self.height = height self.name = name self.clock = clock self._next_frame_index = 0 self.audio_track = audio_track self.remote_video_tracks: List[MediaStreamTrack] = [] self.ball_color = self._name_to_color(name) if name else (0, 255, 136) ball_radius = min(width, height) * 0.06 self.ball = { "x": random.uniform(ball_radius, width - ball_radius), "y": random.uniform(ball_radius, height - ball_radius), "radius": ball_radius, "speed_mps": 0.5, "direction_x": random.uniform(-1.0, 1.0), "direction_y": random.uniform(-1.0, 1.0), } self.frame_count = 0 self._start_time = time.time() self._last_frame_time = time.time() self.fps = 15 self._remote_latest: dict[MediaStreamTrack, npt.NDArray[Any]] = {} self._remote_tasks: List[Tuple[MediaStreamTrack, object, Queue[npt.NDArray[Any]]]] = [] def set_ball_speed(self, speed_mps: float) -> None: self.ball["speed_mps"] = speed_mps def add_remote_video_track(self, track: MediaStreamTrack) -> None: if track.kind != "video": return self.remote_video_tracks.append(track) logger.info("Added remote video track: %s", track) q: Queue[npt.NDArray[Any]] = Queue(maxsize=1) async def pump() -> None: while True: frame = await track.recv() if isinstance(frame, VideoFrame): img = frame.to_ndarray(format="bgr24") if q.full(): try: _ = q.get_nowait() except Exception: pass await q.put(img) t = create_task(pump()) self._remote_tasks.append((track, t, q)) def remove_remote_video_track(self, track: MediaStreamTrack) -> None: if track in self.remote_video_tracks: self.remote_video_tracks.remove(track) logger.info("Removed remote video track: %s", track) def _calculate_velocity_components(self, dt: float) -> Tuple[float, float]: dir_x = float(self.ball["direction_x"]) dir_y = float(self.ball["direction_y"]) mag = math.hypot(dir_x, dir_y) if mag == 0: dir_x_norm, dir_y_norm = 1.0, 0.0 else: dir_x_norm, dir_y_norm = dir_x / mag, dir_y / mag pixels_per_second = self.width * float(self.ball["speed_mps"]) pixels_this_frame = pixels_per_second * dt return pixels_this_frame * dir_x_norm, pixels_this_frame * dir_y_norm async def next_timestamp(self) -> Tuple[int, float]: pts = int(self.frame_count * (1 / self.fps) * 90000) time_base = 1 / 90000 return pts, time_base def _name_to_color(self, name: str) -> Tuple[int, int, int]: hash_value = 5381 for ch in name: hash_value = ((hash_value << 5) + hash_value + ord(ch)) & 0xFFFFFFFF hue = abs(hash_value) % 360 sat = 60 + (abs(hash_value) % 30) light = 45 + (abs(hash_value) % 30) h = hue / 360.0 s = sat / 100.0 lightness = light / 100.0 c = (1 - abs(2 * lightness - 1)) * s x = c * (1 - abs((h * 6) % 2 - 1)) m = lightness - c / 2 if h < 1 / 6: r, g, b = c, x, 0 elif h < 2 / 6: r, g, b = x, c, 0 elif h < 3 / 6: r, g, b = 0, c, x elif h < 4 / 6: r, g, b = 0, x, c elif h < 5 / 6: r, g, b = x, 0, c else: r, g, b = c, 0, x return int((b + m) * 255), int((g + m) * 255), int((r + m) * 255) async def recv(self) -> VideoFrame: pts, time_base = await self.next_timestamp() # schedule frame according to clock target_t = self._next_frame_index / self.fps now = self.clock.now() if target_t > now: await sleep(target_t - now) dt = 1.0 / self.fps dx, dy = self._calculate_velocity_components(dt) pts = int(self._next_frame_index * (90000 / self.fps)) time_base = 1 / 90000 self._next_frame_index += 1 frame_array: npt.NDArray[np.uint8] = np.zeros((self.height, self.width, 3), dtype=np.uint8) # Blend in edge-detected remote frames if available for _track, _task, q in self._remote_tasks: try: img = q.get_nowait() except Exception: continue edges = cv2.Canny(img, 100, 200) img_edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) if img_edges.shape[:2] != (self.height, self.width): img_edges = cv2.resize(img_edges, (self.width, self.height)) frame_array = cv2.addWeighted(frame_array, 0.7, img_edges, 0.3, 0.0).astype(np.uint8) # Update ball position and handle bouncing ball = self.ball ball["x"] += dx ball["y"] += dy bounce_occurred = False if ball["x"] + ball["radius"] >= self.width or ball["x"] - ball["radius"] <= 0: ball["direction_x"] = -ball["direction_x"] bounce_occurred = True if ball["y"] + ball["radius"] >= self.height or ball["y"] - ball["radius"] <= 0: ball["direction_y"] = -ball["direction_y"] bounce_occurred = True if bounce_occurred and self.audio_track is not None: logger.info("Video: Bounce detected, triggering audio event") self.audio_track.add_bounce_event_at(self.clock.now()) ball["x"] = max(ball["radius"], min(self.width - ball["radius"], ball["x"])) ball["y"] = max(ball["radius"], min(self.height - ball["radius"], ball["y"])) cv2.circle(frame_array, (int(ball["x"]), int(ball["y"])), int(ball["radius"]), self.ball_color, -1) frame_text = f"Frame: {int(time.time() * 1000) % 10000}" speed_text = f"Speed: {ball['speed_mps']:.2f} m/s" cv2.putText(frame_array, frame_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.putText(frame_array, speed_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) frame = VideoFrame.from_ndarray(frame_array, format="bgr24") frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self.frame_count += 1 return frame class SyntheticAudioTrack(MediaStreamTrack): """ Synthetic audio track that generates continuous tones based on ball position and additional bounce sound effects. The frequency of the continuous tone is mapped to the ball's Y position: - Top of screen (Y=0): 800Hz (high pitch) - Bottom of screen (Y=height): 200Hz (low pitch) Bounce events add temporary audio effects on top of the continuous tone. """ kind = "audio" def __init__( self, clock: MediaClock, video_track: "AnimatedVideoTrack | None" = None ): super().__init__() self.sample_rate = 48000 self.samples_per_frame = 960 self._samples_generated = 0 self._active_bounces: list[BounceEvent] = [] # List of active bounce events self.video_track = video_track # Reference to video track for ball position self.clock = clock def add_bounce_event_at(self, bounce_time_s: float): start_sample = int(bounce_time_s * self.sample_rate) duration = int(0.2 * self.sample_rate) self._active_bounces.append( { "type": "bounce", "start_sample": start_sample, "end_sample": start_sample + duration, } ) def _get_ball_frequency(self) -> float: """Get the current frequency based on ball Y position""" if not self.video_track: return 440.0 # Default frequency if no video track # Map ball Y position to frequency range (200Hz to 800Hz) ball_y = self.video_track.ball["y"] height = self.video_track.height # Normalize Y position (0.0 at top, 1.0 at bottom) normalized_y = ball_y / height # Map to frequency range (higher pitch for higher position, lower for lower) # Invert so top = high frequency, bottom = low frequency freq_min = 200.0 freq_max = 400.0 frequency = freq_max - (normalized_y * (freq_max - freq_min)) return frequency def _generate_bounce_sample(self, t: float) -> float: """Generate a single bounce sample at time t""" if t < 0 or t > 0.2: return 0.0 # Simple decay envelope decay = np.exp(-t * 10) # Clear, simple tone freq = 400 sound = np.sin(2 * np.pi * freq * t) * decay return sound * 0.9 async def next_timestamp(self) -> tuple[int, float]: pts = self._samples_generated time_base = 1 / self.sample_rate return pts, time_base async def recv(self): """ Generate audio frame with position-based tone and bounce effects. Audio Processing Pipeline: 1. Base tone generation (frequency based on ball Y position) 2. Bounce effect generation (separate, centered audio) 3. Stereo panning (applied to base tone only) 4. Volume compensation (based on ball Y position) 5. Audio mixing and clipping prevention 6. Final conversion to int16 stereo format """ pts, time_base = await self.next_timestamp() # --- 1. TONE GENERATION: Create base frequency tone based on ball Y position --- # Frequency mapping: Top of screen = high pitch (400Hz), bottom = low pitch (200Hz) if self.video_track: base_freq = self._get_ball_frequency() # 200-400Hz range else: base_freq = 440.0 # default A4 if no video track # Generate sine wave at calculated frequency t = (np.arange(self.samples_per_frame) + pts) / self.sample_rate base_samples = np.sin(2 * np.pi * base_freq * t).astype(np.float32) # --- 2. BOUNCE EFFECTS: Generate separate bounce sound effects (centered audio) --- # Bounce effects are generated independently to avoid being affected by panning bounce_samples = np.zeros(self.samples_per_frame, dtype=np.float32) current_time_s = self.clock.now() current_sample = int(current_time_s * self.sample_rate) for bounce in self._active_bounces: if bounce["start_sample"] <= current_sample < bounce["end_sample"]: # Calculate relative time within this specific bounce event sample_offset = current_sample - bounce["start_sample"] bounce_t = sample_offset / self.sample_rate # Generate bounce waveform: 600Hz tone with exponential decay envelope tb = np.arange(self.samples_per_frame) / self.sample_rate + bounce_t bounce_freq = 600.0 # Hz (higher than base tone for clarity) bounce_env = np.exp( -tb * 20.0 ) # Fast exponential decay (20.0 = decay rate) bounce_wave = ( 0.8 * np.sin(2 * np.pi * bounce_freq * tb) * bounce_env ) # 0.8 = bounce amplitude (80% of full scale) # Limit bounce duration to prevent runaway effects valid_samples = tb < 0.2 # 200ms maximum bounce duration bounce_wave[~valid_samples] = 0 # Accumulate bounce effects (multiple bounces can overlap) bounce_samples = bounce_samples + bounce_wave.astype(np.float32) # Clean up expired bounce events to prevent memory accumulation self._active_bounces = [ bounce for bounce in self._active_bounces if bounce["end_sample"] > current_sample ] # --- 3. STEREO PANNING: Apply left/right positioning to base tone only --- # Pan calculation: 0.0 = full left, 0.5 = center, 1.0 = full right if self.video_track: pan = ( self.video_track.ball["x"] / self.video_track.width ) # Normalize to 0-1 else: pan = 0.5 # Center positioning if no video track # Equal-power panning: maintains perceived loudness across stereo field left_gain = math.cos(pan * math.pi / 2) # Left channel gain (1.0 to 0.0) right_gain = math.sin(pan * math.pi / 2) # Right channel gain (0.0 to 1.0) # --- 4. VOLUME COMPENSATION: Apply Y-position based volume scaling --- # Volume scaling compensates for perceptual frequency/amplitude relationship if self.video_track: # Scale volume from 50% (top) to 20% (bottom) # Formula: Map normalized_y from [0,1] to volume range [0.4, 0.2] normalized_y = self.video_track.ball["y"] / self.video_track.height volume = 0.4 - (normalized_y * 0.3) # 0.5 - (0 to 1) * 0.3 = 0.4 to 0.2 else: volume = 0.35 # Mid-range volume (35%) if no video track # --- 5. AUDIO MIXING: Combine panned base tone with centered bounce effects --- # Base tone: Apply stereo panning and volume compensation left_base = base_samples * left_gain * volume right_base = base_samples * right_gain * volume # Final mix: Add bounce effects equally to both channels (no panning) # This keeps bounce effects prominent and centered regardless of ball position left_total = left_base + bounce_samples right_total = right_base + bounce_samples # --- 6. CLIPPING PREVENTION: Dynamic normalization with headroom management --- # Check peak amplitude across both channels to detect potential clipping max_left = np.max(np.abs(left_total)) max_right = np.max(np.abs(right_total)) max_amplitude = max(max_left, max_right) # HEADROOM: Maintain 5% safety margin (0.95 threshold) to prevent digital artifacts if max_amplitude > 0.95: # Threshold chosen to leave headroom for codec/DAC # NORMALIZATION: Scale down entire signal to prevent clipping while preserving dynamics normalization_factor = 0.95 / max_amplitude # Proportional scaling left_total *= normalization_factor right_total *= normalization_factor logger.debug( f"Audio normalization applied: peak={max_amplitude:.3f}, factor={normalization_factor:.3f}" ) # FINAL CONVERSION: Convert to int16 with hard clipping as ultimate safety net # np.clip ensures values never exceed int16 range (-32768 to 32767) left = np.clip(left_total * 32767, -32767, 32767).astype(np.int16) right = np.clip(right_total * 32767, -32767, 32767).astype(np.int16) # --- 7. Interleave channels for s16 format (samples arranged as [L, R, L, R, ...]) --- # Create interleaved array: [left[0], right[0], left[1], right[1], ...] interleaved = np.empty(self.samples_per_frame * 2, dtype=np.int16) interleaved[0::2] = left # Even indices get left channel interleaved[1::2] = right # Odd indices get right channel # Reshape to (1, samples*2) as expected by s16 format stereo = interleaved.reshape(1, -1) frame = AudioFrame.from_ndarray(stereo, format="s16", layout="stereo") frame.sample_rate = self.sample_rate frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self._samples_generated += self.samples_per_frame return frame def create_synthetic_tracks(session_name: str) -> dict[str, MediaStreamTrack]: """ Create synthetic audio and video tracks for WebRTC streaming. Args: session_name: Name to use for generating video track colors Returns: Dictionary containing 'video' and 'audio' tracks Note: - To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps) where speed_in_mps is meters per second (frame width = 1 meter) - Audio generates continuous tone based on ball Y position (200-800Hz) - Bounce events add additional audio on top of the continuous tone """ media_clock = MediaClock() # Create video track first video_track = AnimatedVideoTrack(name=session_name, clock=media_clock) # Create audio track with reference to video track for ball position-based frequency audio_track = SyntheticAudioTrack(video_track=video_track, clock=media_clock) # Set the audio track reference on the video track for bounce events video_track.audio_track = audio_track return {"video": video_track, "audio": audio_track}