""" Synthetic Media Tracks Module This module provides synthetic audio and video track creation for WebRTC media streaming. Contains AnimatedVideoTrack and SyntheticAudioTrack implementations ported from JavaScript. """ import numpy as np import cv2 import fractions import time import random from typing import TypedDict from aiortc import MediaStreamTrack from av import VideoFrame, AudioFrame from logger import logger class BounceEvent(TypedDict): """Type definition for bounce events""" type: str start_sample: int end_sample: int class AnimatedVideoTrack(MediaStreamTrack): """ Synthetic video track that generates animated content with a bouncing ball. Can also composite remote video tracks with edge detection overlay. Remote video tracks are processed through Canny edge detection and blended with the synthetic ball animation. """ kind = "video" def __init__( self, width: int = 320, height: int = 240, name: str = "", audio_track: "SyntheticAudioTrack | None" = None, ): super().__init__() self.width = width self.height = height self.name = name self.audio_track = audio_track # Reference to the audio track self.remote_video_tracks: list[ MediaStreamTrack ] = [] # Store remote video tracks # Generate color from name hash (similar to JavaScript nameToColor) self.ball_color = ( self._name_to_color(name) if name else (0, 255, 136) ) # Default green # Ball properties ball_radius = min(width, height) * 0.06 self.ball = { "x": random.uniform(ball_radius, width - ball_radius), "y": random.uniform(ball_radius, height - ball_radius), "radius": ball_radius, "speed_mps": 0.5, # Speed in meters per second (frame width = 1 meter) "direction_x": random.uniform( -1.0, 1.0 ), # Random direction x component (-1 to 1) "direction_y": random.uniform( -1.0, 1.0 ), # Random direction y component (-1 to 1) } self.frame_count = 0 self._start_time = time.time() self._last_frame_time = time.time() self.fps = 15 # Target frames per second def set_ball_speed(self, speed_mps: float): """Set the ball speed in meters per second""" self.ball["speed_mps"] = speed_mps def add_remote_video_track(self, track: MediaStreamTrack): """Add a remote video track to be composited with edge detection""" if track.kind == "video": self.remote_video_tracks.append(track) logger.info(f"Added remote video track: {track}") def remove_remote_video_track(self, track: MediaStreamTrack): """Remove a remote video track""" if track in self.remote_video_tracks: self.remote_video_tracks.remove(track) logger.info(f"Removed remote video track: {track}") def _calculate_velocity_components(self) -> tuple[float, float]: """ Calculate dx and dy velocity components based on speed in meters per second. Frame width represents 1 meter, so pixels per second = width * speed_mps """ # Calculate actual time delta since last frame current_time = time.time() dt = current_time - self._last_frame_time self._last_frame_time = current_time # Normalize direction vector to ensure consistent speed dir_x = self.ball["direction_x"] dir_y = self.ball["direction_y"] magnitude = np.sqrt(dir_x * dir_x + dir_y * dir_y) if magnitude > 0: dir_x_norm = dir_x / magnitude dir_y_norm = dir_y / magnitude else: dir_x_norm, dir_y_norm = 1.0, 0.0 # Convert meters per second to pixels per actual time delta pixels_per_second = self.width * self.ball["speed_mps"] pixels_this_frame = pixels_per_second * dt # Apply normalized direction to get velocity components dx = pixels_this_frame * dir_x_norm dy = pixels_this_frame * dir_y_norm return dx, dy async def next_timestamp(self): """Returns (pts, time_base) for 15 FPS video""" pts = int(self.frame_count * (1 / 15) * 90000) time_base = 1 / 90000 return pts, time_base def _name_to_color(self, name: str) -> tuple[int, int, int]: """Convert name to HSL color, then to RGB tuple""" # Simple hash function (djb2) hash_value = 5381 for char in name: hash_value = ((hash_value << 5) + hash_value + ord(char)) & 0xFFFFFFFF # Generate HSL color from hash hue = abs(hash_value) % 360 sat = 60 + (abs(hash_value) % 30) # 60-89% light = 45 + (abs(hash_value) % 30) # 45-74% # Convert HSL to RGB h = hue / 360.0 s = sat / 100.0 lightness = light / 100.0 c = (1 - abs(2 * lightness - 1)) * s x = c * (1 - abs((h * 6) % 2 - 1)) m = lightness - c / 2 if h < 1 / 6: r, g, b = c, x, 0 elif h < 2 / 6: r, g, b = x, c, 0 elif h < 3 / 6: r, g, b = 0, c, x elif h < 4 / 6: r, g, b = 0, x, c elif h < 5 / 6: r, g, b = x, 0, c else: r, g, b = c, 0, x return ( int((b + m) * 255), int((g + m) * 255), int((r + m) * 255), ) # BGR for OpenCV async def recv(self): """Generate video frames at 15 FPS""" pts, time_base = await self.next_timestamp() # Create black background frame_array = np.zeros((self.height, self.width, 3), dtype=np.uint8) # Process remote video tracks with edge detection for track in self.remote_video_tracks: try: # Get the latest frame from the remote track (non-blocking) remote_frame = await track.recv() if remote_frame and isinstance(remote_frame, VideoFrame): # Convert to numpy array img: np.ndarray = remote_frame.to_ndarray(format="bgr24") # Apply edge detection edges = cv2.Canny(img, 100, 200) img_edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) # Resize to match our canvas size if needed if img_edges.shape[:2] != (self.height, self.width): img_edges = cv2.resize(img_edges, (self.width, self.height)) # Blend with existing frame (additive blend for edge detection overlay) frame_array = cv2.addWeighted( frame_array.astype(np.uint8), 0.7, img_edges.astype(np.uint8), 0.3, 0, ) except Exception as e: # If we can't get a frame from this track, continue with others logger.debug(f"Could not get frame from remote track: {e}") continue # Calculate velocity components based on current speed dx, dy = self._calculate_velocity_components() # Update ball position ball = self.ball ball["x"] += dx ball["y"] += dy # Bounce off walls and trigger audio events bounce_occurred = False if ball["x"] + ball["radius"] >= self.width or ball["x"] - ball["radius"] <= 0: ball["direction_x"] = -ball["direction_x"] bounce_occurred = True if ball["y"] + ball["radius"] >= self.height or ball["y"] - ball["radius"] <= 0: ball["direction_y"] = -ball["direction_y"] bounce_occurred = True # Trigger bounce sound if a bounce occurred if bounce_occurred and self.audio_track: logger.info("Video: Bounce detected, triggering audio event") self.audio_track.add_bounce_event("bounce") # Keep ball in bounds ball["x"] = max(ball["radius"], min(self.width - ball["radius"], ball["x"])) ball["y"] = max(ball["radius"], min(self.height - ball["radius"], ball["y"])) # Draw ball cv2.circle( frame_array, (int(ball["x"]), int(ball["y"])), int(ball["radius"]), self.ball_color, -1, ) # Add frame counter and speed text frame_text = f"Frame: {int(time.time() * 1000) % 10000}" speed_text = f"Speed: {ball['speed_mps']:.2f} m/s" cv2.putText( frame_array, frame_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, ) cv2.putText( frame_array, speed_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, ) # Convert to VideoFrame frame = VideoFrame.from_ndarray(frame_array.astype(np.uint8), format="bgr24") frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self.frame_count += 1 return frame class SyntheticAudioTrack(MediaStreamTrack): kind = "audio" def __init__(self): super().__init__() self.sample_rate = 48000 self.samples_per_frame = 960 self._samples_generated = 0 self._active_bounces: list[BounceEvent] = [] # List of active bounce events def add_bounce_event(self, bounce_type: str = "bounce"): """Add a bounce event""" bounce_duration_samples = int(0.2 * self.sample_rate) # 200ms # Add new bounce to the list (they can overlap) bounce_event: BounceEvent = { "start_sample": self._samples_generated, "end_sample": self._samples_generated + bounce_duration_samples, "type": bounce_type, } self._active_bounces.append(bounce_event) logger.info( f"Bounce event added - start: {bounce_event['start_sample']}, end: {bounce_event['end_sample']}" ) def _generate_bounce_sample(self, t: float) -> float: """Generate a single bounce sample at time t""" if t < 0 or t > 0.2: return 0.0 # Simple decay envelope decay = np.exp(-t * 10) # Clear, simple tone freq = 400 sound = np.sin(2 * np.pi * freq * t) * decay return sound * 0.9 async def next_timestamp(self) -> tuple[int, float]: pts = self._samples_generated time_base = 1 / self.sample_rate return pts, time_base async def recv(self): pts, time_base = await self.next_timestamp() samples = np.zeros((self.samples_per_frame,), dtype=np.float32) # Generate samples for this frame active_bounce_count = 0 for i in range(self.samples_per_frame): current_sample = self._samples_generated + i sample_value = 0.0 # Check all active bounces for this sample for bounce in self._active_bounces: if bounce["start_sample"] <= current_sample < bounce["end_sample"]: # Calculate time within this bounce sample_offset = current_sample - bounce["start_sample"] t = sample_offset / self.sample_rate # Add this bounce's contribution sample_value += self._generate_bounce_sample(t) active_bounce_count += 1 samples[i] = sample_value # Clean up expired bounces self._active_bounces: list[BounceEvent] = [ bounce for bounce in self._active_bounces if bounce["end_sample"] > self._samples_generated + self.samples_per_frame ] if active_bounce_count > 0: logger.info( f"Generated audio with {len(self._active_bounces)} active bounces" ) self._samples_generated += self.samples_per_frame # Convert to audio frame samples = np.clip(samples, -1.0, 1.0) samples_s16 = (samples * 32767).astype(np.int16) frame = AudioFrame.from_ndarray( samples_s16.reshape(1, -1), format="s16", layout="stereo" ) frame.sample_rate = self.sample_rate frame.pts = pts frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) return frame def create_synthetic_tracks(session_name: str) -> dict[str, MediaStreamTrack]: """ Create synthetic audio and video tracks for WebRTC streaming. Args: session_name: Name to use for generating video track colors Returns: Dictionary containing 'video' and 'audio' tracks Note: To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps) where speed_in_mps is meters per second (frame width = 1 meter) """ # Create audio track first audio_track = SyntheticAudioTrack() # Create video track with reference to audio track for bounce events video_track = AnimatedVideoTrack(name=session_name, audio_track=audio_track) return {"video": video_track, "audio": audio_track}