""" Synthetic Media Tracks Module This module provides synthetic audio and video track creation for WebRTC media streaming. Contains AnimatedVideoTrack and SyntheticAudioTrack implementations ported from JavaScript. """ import numpy as np import math import cv2 import fractions import time import random from av.audio.frame import AudioFrame from asyncio import Queue, create_task, sleep from typing import TypedDict, TYPE_CHECKING from aiortc import MediaStreamTrack from av import VideoFrame from logger import logger if TYPE_CHECKING: pass # Shared clock from time import perf_counter class MediaClock: def __init__(self): self.t0 = perf_counter() def now(self) -> float: return perf_counter() - self.t0 class BounceEvent(TypedDict): """Type definition for bounce events""" type: str start_sample: int end_sample: int class AnimatedVideoTrack(MediaStreamTrack): """ Synthetic video track that generates animated content with a bouncing ball. Can also composite remote video tracks with edge detection overlay. Remote video tracks are processed through Canny edge detection and blended with the synthetic ball animation. """ kind = "video" def __init__( self, clock: MediaClock, width: int = 320, height: int = 240, name: str = "", audio_track: "SyntheticAudioTrack | None" = None, ): super().__init__() self.width = width self.height = height self.name = name self.clock = clock self.fps = 15 self._next_frame_index = 0 self.audio_track = audio_track # Reference to the audio track self.remote_video_tracks: list[ MediaStreamTrack ] = [] # Store remote video tracks # Generate color from name hash (similar to JavaScript nameToColor) self.ball_color = ( self._name_to_color(name) if name else (0, 255, 136) ) # Default green # Ball properties ball_radius = min(width, height) * 0.06 self.ball = { "x": random.uniform(ball_radius, width - ball_radius), "y": random.uniform(ball_radius, height - ball_radius), "radius": ball_radius, "speed_mps": 0.5, # Speed in meters per second (frame width = 1 meter) "direction_x": random.uniform( -1.0, 1.0 ), # Random direction x component (-1 to 1) "direction_y": random.uniform( -1.0, 1.0 ), # Random direction y component (-1 to 1) } self.frame_count = 0 self._start_time = time.time() self._last_frame_time = time.time() self.fps = 15 # Target frames per second self._remote_latest = {} # track -> np.ndarray self._remote_tasks: list[ tuple[MediaStreamTrack, object, Queue[np.ndarray]] ] = [] def set_ball_speed(self, speed_mps: float): """Set the ball speed in meters per second""" self.ball["speed_mps"] = speed_mps def add_remote_video_track(self, track: MediaStreamTrack): """Add a remote video track to be composited with edge detection""" if track.kind == "video": self.remote_video_tracks.append(track) logger.info(f"Added remote video track: {track}") q: Queue[np.ndarray] = Queue(maxsize=1) async def pump(): while True: frame = await track.recv() if isinstance(frame, VideoFrame): img: np.ndarray = frame.to_ndarray(format="bgr24") if q.full(): _ = q.get_nowait() await q.put(img) t = create_task(pump()) self._remote_tasks.append((track, t, q)) def remove_remote_video_track(self, track: MediaStreamTrack): """Remove a remote video track""" if track in self.remote_video_tracks: self.remote_video_tracks.remove(track) logger.info(f"Removed remote video track: {track}") def _calculate_velocity_components(self, dt: float) -> tuple[float, float]: dir_x, dir_y = self.ball["direction_x"], self.ball["direction_y"] mag = np.hypot(dir_x, dir_y) if mag == 0: dir_x_norm, dir_y_norm = 1.0, 0.0 else: dir_x_norm, dir_y_norm = dir_x / mag, dir_y / mag pixels_per_second = self.width * self.ball["speed_mps"] pixels_this_frame = pixels_per_second * dt return pixels_this_frame * dir_x_norm, pixels_this_frame * dir_y_norm async def next_timestamp(self): """Returns (pts, time_base) for 15 FPS video""" pts = int(self.frame_count * (1 / 15) * 90000) time_base = 1 / 90000 return pts, time_base def _name_to_color(self, name: str) -> tuple[int, int, int]: """Convert name to HSL color, then to RGB tuple""" # Simple hash function (djb2) hash_value = 5381 for char in name: hash_value = ((hash_value << 5) + hash_value + ord(char)) & 0xFFFFFFFF # Generate HSL color from hash hue = abs(hash_value) % 360 sat = 60 + (abs(hash_value) % 30) # 60-89% light = 45 + (abs(hash_value) % 30) # 45-74% # Convert HSL to RGB h = hue / 360.0 s = sat / 100.0 lightness = light / 100.0 c = (1 - abs(2 * lightness - 1)) * s x = c * (1 - abs((h * 6) % 2 - 1)) m = lightness - c / 2 if h < 1 / 6: r, g, b = c, x, 0 elif h < 2 / 6: r, g, b = x, c, 0 elif h < 3 / 6: r, g, b = 0, c, x elif h < 4 / 6: r, g, b = 0, x, c elif h < 5 / 6: r, g, b = x, 0, c else: r, g, b = c, 0, x return ( int((b + m) * 255), int((g + m) * 255), int((r + m) * 255), ) # BGR for OpenCV async def recv(self): """Generate video frames at 15 FPS""" pts, time_base = await self.next_timestamp() # Target timestamp for this frame (seconds since t0) target_t = self._next_frame_index / self.fps now = self.clock.now() if target_t > now: await sleep(target_t - now) # Use constant dt tied to fps (prevents physics jitter) dt = 1.0 / self.fps dx, dy = self._calculate_velocity_components(dt) # PTS derived from frame index, not wall clock pts = int(self._next_frame_index * (90000 / self.fps)) time_base = 1 / 90000 self._next_frame_index += 1 # Create black background frame_array = np.zeros((self.height, self.width, 3), dtype=np.uint8) # Process remote video tracks with edge detection for _track, _task, q in self._remote_tasks: try: img: np.ndarray = q.get_nowait() except Exception: continue edges = cv2.Canny(img, 100, 200) img_edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) if img_edges.shape[:2] != (self.height, self.width): img_edges = cv2.resize(img_edges, (self.width, self.height)) frame_array = cv2.addWeighted(frame_array, 0.7, img_edges, 0.3, 0.0) # Update ball position ball = self.ball ball["x"] += dx ball["y"] += dy # Bounce off walls and trigger audio events bounce_occurred = False if ball["x"] + ball["radius"] >= self.width or ball["x"] - ball["radius"] <= 0: ball["direction_x"] = -ball["direction_x"] bounce_occurred = True if ball["y"] + ball["radius"] >= self.height or ball["y"] - ball["radius"] <= 0: ball["direction_y"] = -ball["direction_y"] bounce_occurred = True # Trigger bounce sound if a bounce occurred if bounce_occurred and self.audio_track: logger.info("Video: Bounce detected, triggering audio event") self.audio_track.add_bounce_event_at(self.clock.now()) # Keep ball in bounds ball["x"] = max(ball["radius"], min(self.width - ball["radius"], ball["x"])) ball["y"] = max(ball["radius"], min(self.height - ball["radius"], ball["y"])) # Draw ball cv2.circle( frame_array, (int(ball["x"]), int(ball["y"])), int(ball["radius"]), self.ball_color, -1, ) # Add frame counter and speed text frame_text = f"Frame: {int(time.time() * 1000) % 10000}" speed_text = f"Speed: {ball['speed_mps']:.2f} m/s" cv2.putText( frame_array, frame_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, ) cv2.putText( frame_array, speed_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, ) # Convert to VideoFrame frame = VideoFrame.from_ndarray(frame_array.astype(np.uint8), format="bgr24") frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self.frame_count += 1 return frame class SyntheticAudioTrack(MediaStreamTrack): """ Synthetic audio track that generates continuous tones based on ball position and additional bounce sound effects. The frequency of the continuous tone is mapped to the ball's Y position: - Top of screen (Y=0): 800Hz (high pitch) - Bottom of screen (Y=height): 200Hz (low pitch) Bounce events add temporary audio effects on top of the continuous tone. """ kind = "audio" def __init__( self, clock: MediaClock, video_track: "AnimatedVideoTrack | None" = None ): super().__init__() self.sample_rate = 48000 self.samples_per_frame = 960 self._samples_generated = 0 self._active_bounces: list[BounceEvent] = [] # List of active bounce events self.video_track = video_track # Reference to video track for ball position self.clock = clock def add_bounce_event_at(self, bounce_time_s: float): start_sample = int(bounce_time_s * self.sample_rate) duration = int(0.2 * self.sample_rate) self._active_bounces.append( { "type": "bounce", "start_sample": start_sample, "end_sample": start_sample + duration, } ) def _get_ball_frequency(self) -> float: """Get the current frequency based on ball Y position""" if not self.video_track: return 440.0 # Default frequency if no video track # Map ball Y position to frequency range (200Hz to 800Hz) ball_y = self.video_track.ball["y"] height = self.video_track.height # Normalize Y position (0.0 at top, 1.0 at bottom) normalized_y = ball_y / height # Map to frequency range (higher pitch for higher position, lower for lower) # Invert so top = high frequency, bottom = low frequency freq_min = 200.0 freq_max = 400.0 frequency = freq_max - (normalized_y * (freq_max - freq_min)) return frequency def _generate_bounce_sample(self, t: float) -> float: """Generate a single bounce sample at time t""" if t < 0 or t > 0.2: return 0.0 # Simple decay envelope decay = np.exp(-t * 10) # Clear, simple tone freq = 400 sound = np.sin(2 * np.pi * freq * t) * decay return sound * 0.9 async def next_timestamp(self) -> tuple[int, float]: pts = self._samples_generated time_base = 1 / self.sample_rate return pts, time_base async def recv(self): pts, time_base = await self.next_timestamp() # --- 1. Generate base tone based on ball Y position --- if self.video_track: base_freq = self._get_ball_frequency() else: base_freq = 440.0 # default if no video track t = (np.arange(self.samples_per_frame) + pts) / self.sample_rate samples = np.sin(2 * np.pi * base_freq * t).astype(np.float32) # --- 2. Add bounce sound effect if triggered --- if getattr(self, "just_bounced", False): logger.info("Audio: Generating bounce sound effect") tb = np.arange(self.samples_per_frame) / self.sample_rate bounce_freq = 600.0 # Hz bounce_env = np.exp(-tb * 20.0) # fast exponential decay bounce_wave = 0.4 * np.sin(2 * np.pi * bounce_freq * tb) * bounce_env samples = samples + bounce_wave.astype(np.float32) self.just_bounced = False # --- 3. Stereo panning based on X position --- if self.video_track: pan = self.video_track.ball["x"] / self.video_track.width else: pan = 0.5 # center if no video left_gain = math.cos(pan * math.pi / 2) right_gain = math.sin(pan * math.pi / 2) # --- 4. Volume scaling based on Y position --- if self.video_track: volume = (1.0 - (self.video_track.ball["y"] / self.video_track.height)) ** 2 else: volume = 1.0 # --- 5. Apply gain and convert to int16 --- left = (samples * left_gain * volume * 32767).astype(np.int16) right = (samples * right_gain * volume * 32767).astype(np.int16) # --- 6. Interleave channels for s16 format (samples arranged as [L, R, L, R, ...]) --- # Create interleaved array: [left[0], right[0], left[1], right[1], ...] interleaved = np.empty(self.samples_per_frame * 2, dtype=np.int16) interleaved[0::2] = left # Even indices get left channel interleaved[1::2] = right # Odd indices get right channel # Reshape to (1, samples*2) as expected by s16 format stereo = interleaved.reshape(1, -1) frame = AudioFrame.from_ndarray(stereo, format="s16", layout="stereo") frame.sample_rate = self.sample_rate frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self._samples_generated += self.samples_per_frame return frame def create_synthetic_tracks(session_name: str) -> dict[str, MediaStreamTrack]: """ Create synthetic audio and video tracks for WebRTC streaming. Args: session_name: Name to use for generating video track colors Returns: Dictionary containing 'video' and 'audio' tracks Note: - To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps) where speed_in_mps is meters per second (frame width = 1 meter) - Audio generates continuous tone based on ball Y position (200-800Hz) - Bounce events add additional audio on top of the continuous tone """ media_clock = MediaClock() # Create video track first video_track = AnimatedVideoTrack(name=session_name, clock=media_clock) # Create audio track with reference to video track for ball position-based frequency audio_track = SyntheticAudioTrack(video_track=video_track, clock=media_clock) # Set the audio track reference on the video track for bounce events video_track.audio_track = audio_track return {"video": video_track, "audio": audio_track}