""" Synthetic Media Tracks Module This module provides synthetic audio and video track creation for WebRTC media streaming. Contains AnimatedVideoTrack and SyntheticAudioTrack implementations ported from JavaScript. """ import numpy as np import cv2 import fractions import time from typing import TypedDict from aiortc import MediaStreamTrack from av import VideoFrame, AudioFrame class BounceEvent(TypedDict): """Type definition for bounce events""" type: str start_time: float end_time: float class AnimatedVideoTrack(MediaStreamTrack): """ Synthetic video track that generates animated content with a bouncing ball. Ported from JavaScript createAnimatedVideoTrack function. """ kind = "video" def __init__( self, width: int = 320, height: int = 240, name: str = "", audio_track: "SyntheticAudioTrack | None" = None, ): super().__init__() self.width = width self.height = height self.name = name self.audio_track = audio_track # Reference to the audio track # Generate color from name hash (similar to JavaScript nameToColor) self.ball_color = ( self._name_to_color(name) if name else (0, 255, 136) ) # Default green # Ball properties self.ball = { "x": width / 2, "y": height / 2, "radius": min(width, height) * 0.06, "speed_mps": 0.5, # Speed in meters per second (frame width = 1 meter) "direction_x": 1.0, # Direction vector x component (-1 to 1) "direction_y": 0.6, # Direction vector y component (-1 to 1) } self.frame_count = 0 self._start_time = time.time() self._last_frame_time = time.time() self.fps = 15 # Target frames per second def set_ball_speed(self, speed_mps: float): """Set the ball speed in meters per second""" self.ball["speed_mps"] = speed_mps def _calculate_velocity_components(self) -> tuple[float, float]: """ Calculate dx and dy velocity components based on speed in meters per second. Frame width represents 1 meter, so pixels per second = width * speed_mps """ # Calculate actual time delta since last frame current_time = time.time() dt = current_time - self._last_frame_time self._last_frame_time = current_time # Normalize direction vector to ensure consistent speed dir_x = self.ball["direction_x"] dir_y = self.ball["direction_y"] magnitude = np.sqrt(dir_x * dir_x + dir_y * dir_y) if magnitude > 0: dir_x_norm = dir_x / magnitude dir_y_norm = dir_y / magnitude else: dir_x_norm, dir_y_norm = 1.0, 0.0 # Convert meters per second to pixels per actual time delta pixels_per_second = self.width * self.ball["speed_mps"] pixels_this_frame = pixels_per_second * dt # Apply normalized direction to get velocity components dx = pixels_this_frame * dir_x_norm dy = pixels_this_frame * dir_y_norm return dx, dy async def next_timestamp(self): """Returns (pts, time_base) for 15 FPS video""" pts = int(self.frame_count * (1 / 15) * 90000) time_base = 1 / 90000 return pts, time_base def _name_to_color(self, name: str) -> tuple[int, int, int]: """Convert name to HSL color, then to RGB tuple""" # Simple hash function (djb2) hash_value = 5381 for char in name: hash_value = ((hash_value << 5) + hash_value + ord(char)) & 0xFFFFFFFF # Generate HSL color from hash hue = abs(hash_value) % 360 sat = 60 + (abs(hash_value) % 30) # 60-89% light = 45 + (abs(hash_value) % 30) # 45-74% # Convert HSL to RGB h = hue / 360.0 s = sat / 100.0 lightness = light / 100.0 c = (1 - abs(2 * lightness - 1)) * s x = c * (1 - abs((h * 6) % 2 - 1)) m = lightness - c / 2 if h < 1 / 6: r, g, b = c, x, 0 elif h < 2 / 6: r, g, b = x, c, 0 elif h < 3 / 6: r, g, b = 0, c, x elif h < 4 / 6: r, g, b = 0, x, c elif h < 5 / 6: r, g, b = x, 0, c else: r, g, b = c, 0, x return ( int((b + m) * 255), int((g + m) * 255), int((r + m) * 255), ) # BGR for OpenCV async def recv(self): """Generate video frames at 15 FPS""" pts, time_base = await self.next_timestamp() # Create black background frame_array = np.zeros((self.height, self.width, 3), dtype=np.uint8) # Calculate velocity components based on current speed dx, dy = self._calculate_velocity_components() # Update ball position ball = self.ball ball["x"] += dx ball["y"] += dy # Bounce off walls and trigger audio events bounce_occurred = False if ball["x"] + ball["radius"] >= self.width or ball["x"] - ball["radius"] <= 0: ball["direction_x"] = -ball["direction_x"] bounce_occurred = True if ball["y"] + ball["radius"] >= self.height or ball["y"] - ball["radius"] <= 0: ball["direction_y"] = -ball["direction_y"] bounce_occurred = True # Trigger bounce sound if a bounce occurred if bounce_occurred and self.audio_track: self.audio_track.add_bounce_event("bounce") # Keep ball in bounds ball["x"] = max(ball["radius"], min(self.width - ball["radius"], ball["x"])) ball["y"] = max(ball["radius"], min(self.height - ball["radius"], ball["y"])) # Draw ball cv2.circle( frame_array, (int(ball["x"]), int(ball["y"])), int(ball["radius"]), self.ball_color, -1, ) # Add frame counter and speed text frame_text = f"Frame: {int(time.time() * 1000) % 10000}" speed_text = f"Speed: {ball['speed_mps']:.2f} m/s" cv2.putText( frame_array, frame_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, ) cv2.putText( frame_array, speed_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, ) # Convert to VideoFrame frame = VideoFrame.from_ndarray(frame_array, format="bgr24") frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self.frame_count += 1 return frame class SyntheticAudioTrack(MediaStreamTrack): """ Synthetic audio track that generates audio including bounce sounds. Originally a silent audio track, now enhanced to generate synthetic audio effects. """ kind = "audio" def __init__(self): super().__init__() self.sample_rate = 48000 self.samples_per_frame = 960 # 20ms at 48kHz self.bounce_queue: list[BounceEvent] = [] # Queue of bounce events to process self.bounce_duration = 0.1 # 100ms bounce sound duration self.bounce_amplitude = 0.3 # Amplitude of bounce sound def add_bounce_event(self, bounce_type: str = "bounce"): """Add a bounce event to the audio queue""" current_time = time.time() self.bounce_queue.append( { "type": bounce_type, "start_time": current_time, "end_time": current_time + self.bounce_duration, } ) def _generate_bounce_sound(self, t: float) -> float: """Generate a simple bounce sound using a decaying sine wave""" # Simple bounce sound: combination of two frequencies with decay freq1 = 800 # Primary frequency freq2 = 1200 # Secondary frequency decay = np.exp(-t * 10) # Exponential decay sound = ( np.sin(2 * np.pi * freq1 * t) * 0.7 + np.sin(2 * np.pi * freq2 * t) * 0.3 ) * decay return sound * self.bounce_amplitude async def next_timestamp(self): """Returns (pts, time_base) for 20ms audio frames at 48kHz""" pts = int(time.time() * self.sample_rate) time_base = 1 / self.sample_rate return pts, time_base async def recv(self): """Generate audio frames with bounce sounds""" pts, time_base = await self.next_timestamp() current_time = time.time() # Create audio data samples = np.zeros((self.samples_per_frame,), dtype=np.float32) # Check for active bounce events and generate sounds active_bounces: list[BounceEvent] = [] for bounce in self.bounce_queue: if current_time < bounce["end_time"]: # Calculate time within the bounce sound t = current_time - bounce["start_time"] if t >= 0: # Generate bounce sound for this time frame for i in range(self.samples_per_frame): sample_time = t + (i / self.sample_rate) if sample_time <= self.bounce_duration: samples[i] += self._generate_bounce_sound(sample_time) active_bounces.append(bounce) # Keep only active bounces self.bounce_queue = active_bounces # Clamp samples to prevent distortion samples = np.clip(samples, -1.0, 1.0) # Convert to s16 format (required by Opus encoder) samples_s16 = (samples * 32767).astype(np.int16) # Convert to AudioFrame frame = AudioFrame.from_ndarray( samples_s16.reshape(1, -1), format="s16", layout="mono" ) frame.sample_rate = self.sample_rate frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) return frame def create_synthetic_tracks(session_name: str) -> dict[str, MediaStreamTrack]: """ Create synthetic audio and video tracks for WebRTC streaming. Args: session_name: Name to use for generating video track colors Returns: Dictionary containing 'video' and 'audio' tracks Note: To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps) where speed_in_mps is meters per second (frame width = 1 meter) """ # Create audio track first audio_track = SyntheticAudioTrack() # Create video track with reference to audio track for bounce events video_track = AnimatedVideoTrack(name=session_name, audio_track=audio_track) return {"video": video_track, "audio": audio_track}