477 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			477 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Synthetic Media Tracks Module (bots/synthetic_media)
 | |
| 
 | |
| Copied from voicebot/synthetic_media. This module contains the real implementation
 | |
| and heavy dependencies and is intended to be imported lazily by the orchestrator
 | |
| or runtime controllers.
 | |
| """
 | |
| 
 | |
| # ...existing heavy implementation moved here (keeps same content as original file)
 | |
| 
 | |
| import numpy as np
 | |
| import math
 | |
| import cv2
 | |
| import fractions
 | |
| import time
 | |
| import random
 | |
| from av.audio.frame import AudioFrame
 | |
| from asyncio import Queue, create_task, sleep
 | |
| from typing import TypedDict, TYPE_CHECKING, Tuple, List, Optional, Any
 | |
| import numpy.typing as npt
 | |
| from aiortc import MediaStreamTrack
 | |
| from av import VideoFrame
 | |
| from shared.logger import logger
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     pass
 | |
| 
 | |
| # Shared clock
 | |
| from time import perf_counter
 | |
| 
 | |
| 
 | |
| class MediaClock:
 | |
|     def __init__(self):
 | |
|         self.t0 = perf_counter()
 | |
| 
 | |
|     def now(self) -> float:
 | |
|         return perf_counter() - self.t0
 | |
| 
 | |
| 
 | |
| class BounceEvent(TypedDict):
 | |
|     """Type definition for bounce events"""
 | |
| 
 | |
|     type: str
 | |
|     start_sample: int
 | |
|     end_sample: int
 | |
| 
 | |
| 
 | |
| class AnimatedVideoTrack(MediaStreamTrack):
 | |
|     """Animated synthetic video track (fixed, properly-indented implementation)."""
 | |
| 
 | |
|     kind = "video"
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         clock: MediaClock,
 | |
|         width: int = 320,
 | |
|         height: int = 240,
 | |
|         name: str = "",
 | |
|         audio_track: Optional["SyntheticAudioTrack"] = None,
 | |
|     ) -> None:
 | |
|         super().__init__()
 | |
|         self.width = width
 | |
|         self.height = height
 | |
|         self.name = name
 | |
|         self.clock = clock
 | |
|         self._next_frame_index = 0
 | |
| 
 | |
|         self.audio_track = audio_track
 | |
|         self.remote_video_tracks: List[MediaStreamTrack] = []
 | |
| 
 | |
|         self.ball_color = self._name_to_color(name) if name else (0, 255, 136)
 | |
| 
 | |
|         ball_radius = min(width, height) * 0.06
 | |
|         self.ball = {
 | |
|             "x": random.uniform(ball_radius, width - ball_radius),
 | |
|             "y": random.uniform(ball_radius, height - ball_radius),
 | |
|             "radius": ball_radius,
 | |
|             "speed_mps": 0.5,
 | |
|             "direction_x": random.uniform(-1.0, 1.0),
 | |
|             "direction_y": random.uniform(-1.0, 1.0),
 | |
|         }
 | |
| 
 | |
|         self.frame_count = 0
 | |
|         self._start_time = time.time()
 | |
|         self._last_frame_time = time.time()
 | |
|         self.fps = 15
 | |
| 
 | |
|         self._remote_latest: dict[MediaStreamTrack, npt.NDArray[Any]] = {}
 | |
|         self._remote_tasks: List[Tuple[MediaStreamTrack, object, Queue[npt.NDArray[Any]]]] = []
 | |
| 
 | |
|     def set_ball_speed(self, speed_mps: float) -> None:
 | |
|         self.ball["speed_mps"] = speed_mps
 | |
| 
 | |
|     def add_remote_video_track(self, track: MediaStreamTrack) -> None:
 | |
|         if track.kind != "video":
 | |
|             return
 | |
|         self.remote_video_tracks.append(track)
 | |
|         logger.info("Added remote video track: %s", track)
 | |
|         q: Queue[npt.NDArray[Any]] = Queue(maxsize=1)
 | |
| 
 | |
|         async def pump() -> None:
 | |
|             while True:
 | |
|                 frame = await track.recv()
 | |
|                 if isinstance(frame, VideoFrame):
 | |
|                     img = frame.to_ndarray(format="bgr24")
 | |
|                     if q.full():
 | |
|                         try:
 | |
|                             _ = q.get_nowait()
 | |
|                         except Exception:
 | |
|                             pass
 | |
|                     await q.put(img)
 | |
| 
 | |
|         t = create_task(pump())
 | |
|         self._remote_tasks.append((track, t, q))
 | |
| 
 | |
|     def remove_remote_video_track(self, track: MediaStreamTrack) -> None:
 | |
|         if track in self.remote_video_tracks:
 | |
|             self.remote_video_tracks.remove(track)
 | |
|             logger.info("Removed remote video track: %s", track)
 | |
| 
 | |
|     def _calculate_velocity_components(self, dt: float) -> Tuple[float, float]:
 | |
|         dir_x = float(self.ball["direction_x"])
 | |
|         dir_y = float(self.ball["direction_y"])
 | |
|         mag = math.hypot(dir_x, dir_y)
 | |
|         if mag == 0:
 | |
|             dir_x_norm, dir_y_norm = 1.0, 0.0
 | |
|         else:
 | |
|             dir_x_norm, dir_y_norm = dir_x / mag, dir_y / mag
 | |
|         pixels_per_second = self.width * float(self.ball["speed_mps"])
 | |
|         pixels_this_frame = pixels_per_second * dt
 | |
|         return pixels_this_frame * dir_x_norm, pixels_this_frame * dir_y_norm
 | |
| 
 | |
|     async def next_timestamp(self) -> Tuple[int, float]:
 | |
|         pts = int(self.frame_count * (1 / self.fps) * 90000)
 | |
|         time_base = 1 / 90000
 | |
|         return pts, time_base
 | |
| 
 | |
|     def _name_to_color(self, name: str) -> Tuple[int, int, int]:
 | |
|         hash_value = 5381
 | |
|         for ch in name:
 | |
|             hash_value = ((hash_value << 5) + hash_value + ord(ch)) & 0xFFFFFFFF
 | |
|         hue = abs(hash_value) % 360
 | |
|         sat = 60 + (abs(hash_value) % 30)
 | |
|         light = 45 + (abs(hash_value) % 30)
 | |
|         h = hue / 360.0
 | |
|         s = sat / 100.0
 | |
|         lightness = light / 100.0
 | |
|         c = (1 - abs(2 * lightness - 1)) * s
 | |
|         x = c * (1 - abs((h * 6) % 2 - 1))
 | |
|         m = lightness - c / 2
 | |
|         if h < 1 / 6:
 | |
|             r, g, b = c, x, 0
 | |
|         elif h < 2 / 6:
 | |
|             r, g, b = x, c, 0
 | |
|         elif h < 3 / 6:
 | |
|             r, g, b = 0, c, x
 | |
|         elif h < 4 / 6:
 | |
|             r, g, b = 0, x, c
 | |
|         elif h < 5 / 6:
 | |
|             r, g, b = x, 0, c
 | |
|         else:
 | |
|             r, g, b = c, 0, x
 | |
|         return int((b + m) * 255), int((g + m) * 255), int((r + m) * 255)
 | |
| 
 | |
|     async def recv(self) -> VideoFrame:
 | |
|         pts, time_base = await self.next_timestamp()
 | |
| 
 | |
|         # schedule frame according to clock
 | |
|         target_t = self._next_frame_index / self.fps
 | |
|         now = self.clock.now()
 | |
|         if target_t > now:
 | |
|             await sleep(target_t - now)
 | |
| 
 | |
|         dt = 1.0 / self.fps
 | |
|         dx, dy = self._calculate_velocity_components(dt)
 | |
| 
 | |
|         pts = int(self._next_frame_index * (90000 / self.fps))
 | |
|         time_base = 1 / 90000
 | |
|         self._next_frame_index += 1
 | |
| 
 | |
|         frame_array: npt.NDArray[np.uint8] = np.zeros((self.height, self.width, 3), dtype=np.uint8)
 | |
| 
 | |
|         # Blend in edge-detected remote frames if available
 | |
|         for _track, _task, q in self._remote_tasks:
 | |
|             try:
 | |
|                 img = q.get_nowait()
 | |
|             except Exception:
 | |
|                 continue
 | |
|             edges = cv2.Canny(img, 100, 200)
 | |
|             img_edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
 | |
|             if img_edges.shape[:2] != (self.height, self.width):
 | |
|                 img_edges = cv2.resize(img_edges, (self.width, self.height))
 | |
|             frame_array = cv2.addWeighted(frame_array, 0.7, img_edges, 0.3, 0.0).astype(np.uint8)
 | |
| 
 | |
|         # Update ball position and handle bouncing
 | |
|         ball = self.ball
 | |
|         ball["x"] += dx
 | |
|         ball["y"] += dy
 | |
|         bounce_occurred = False
 | |
|         if ball["x"] + ball["radius"] >= self.width or ball["x"] - ball["radius"] <= 0:
 | |
|             ball["direction_x"] = -ball["direction_x"]
 | |
|             bounce_occurred = True
 | |
|         if ball["y"] + ball["radius"] >= self.height or ball["y"] - ball["radius"] <= 0:
 | |
|             ball["direction_y"] = -ball["direction_y"]
 | |
|             bounce_occurred = True
 | |
| 
 | |
|         if bounce_occurred and self.audio_track is not None:
 | |
|             logger.info("Video: Bounce detected, triggering audio event")
 | |
|             self.audio_track.add_bounce_event_at(self.clock.now())
 | |
| 
 | |
|         ball["x"] = max(ball["radius"], min(self.width - ball["radius"], ball["x"]))
 | |
|         ball["y"] = max(ball["radius"], min(self.height - ball["radius"], ball["y"]))
 | |
| 
 | |
|         cv2.circle(frame_array, (int(ball["x"]), int(ball["y"])), int(ball["radius"]), self.ball_color, -1)
 | |
| 
 | |
|         frame_text = f"Frame: {int(time.time() * 1000) % 10000}"
 | |
|         speed_text = f"Speed: {ball['speed_mps']:.2f} m/s"
 | |
|         cv2.putText(frame_array, frame_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
 | |
|         cv2.putText(frame_array, speed_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
 | |
| 
 | |
|         frame = VideoFrame.from_ndarray(frame_array, format="bgr24")
 | |
|         frame.pts = pts
 | |
|         frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000)
 | |
| 
 | |
|         self.frame_count += 1
 | |
|         return frame
 | |
| 
 | |
| 
 | |
| class SyntheticAudioTrack(MediaStreamTrack):
 | |
|     """
 | |
|     Synthetic audio track that generates continuous tones based on ball position
 | |
|     and additional bounce sound effects.
 | |
| 
 | |
|     The frequency of the continuous tone is mapped to the ball's Y position:
 | |
|     - Top of screen (Y=0): 800Hz (high pitch)
 | |
|     - Bottom of screen (Y=height): 200Hz (low pitch)
 | |
| 
 | |
|     Bounce events add temporary audio on top of the continuous tone.
 | |
|     """
 | |
| 
 | |
|     kind = "audio"
 | |
| 
 | |
|     def __init__(
 | |
|         self, clock: MediaClock, video_track: "AnimatedVideoTrack | None" = None
 | |
|     ):
 | |
|         super().__init__()
 | |
|         self.sample_rate = 48000
 | |
|         self.samples_per_frame = 960
 | |
|         self._samples_generated = 0
 | |
|         self._active_bounces: list[BounceEvent] = []  # List of active bounce events
 | |
|         self.video_track = video_track  # Reference to video track for ball position
 | |
|         self.clock = clock
 | |
| 
 | |
|     def add_bounce_event_at(self, bounce_time_s: float):
 | |
|         start_sample = int(bounce_time_s * self.sample_rate)
 | |
|         duration = int(0.2 * self.sample_rate)
 | |
|         self._active_bounces.append(
 | |
|             {
 | |
|                 "type": "bounce",
 | |
|                 "start_sample": start_sample,
 | |
|                 "end_sample": start_sample + duration,
 | |
|             }
 | |
|         )
 | |
| 
 | |
|     def _get_ball_frequency(self) -> float:
 | |
|         """Get the current frequency based on ball Y position"""
 | |
|         if not self.video_track:
 | |
|             return 440.0  # Default frequency if no video track
 | |
| 
 | |
|         # Map ball Y position to frequency range (200Hz to 800Hz)
 | |
|         ball_y = self.video_track.ball["y"]
 | |
|         height = self.video_track.height
 | |
| 
 | |
|         # Normalize Y position (0.0 at top, 1.0 at bottom)
 | |
|         normalized_y = ball_y / height
 | |
| 
 | |
|         # Map to frequency range (higher pitch for higher position, lower for lower)
 | |
|         # Invert so top = high frequency, bottom = low frequency
 | |
|         freq_min = 200.0
 | |
|         freq_max = 400.0
 | |
|         frequency = freq_max - (normalized_y * (freq_max - freq_min))
 | |
| 
 | |
|         return frequency
 | |
| 
 | |
|     def _generate_bounce_sample(self, t: float) -> float:
 | |
|         """Generate a single bounce sample at time t"""
 | |
|         if t < 0 or t > 0.2:
 | |
|             return 0.0
 | |
| 
 | |
|         # Simple decay envelope
 | |
|         decay = np.exp(-t * 10)
 | |
| 
 | |
|         # Clear, simple tone
 | |
|         freq = 400
 | |
|         sound = np.sin(2 * np.pi * freq * t) * decay
 | |
| 
 | |
|         return sound * 0.9
 | |
| 
 | |
|     async def next_timestamp(self) -> tuple[int, float]:
 | |
|         pts = self._samples_generated
 | |
|         time_base = 1 / self.sample_rate
 | |
|         return pts, time_base
 | |
| 
 | |
|     async def recv(self):
 | |
|         """
 | |
|         Generate audio frame with position-based tone and bounce effects.
 | |
| 
 | |
|         Audio Processing Pipeline:
 | |
|         1. Base tone generation (frequency based on ball Y position)
 | |
|         2. Bounce effect generation (separate, centered audio)
 | |
|         3. Stereo panning (applied to base tone only)
 | |
|         4. Volume compensation (based on ball Y position)
 | |
|         5. Audio mixing and clipping prevention
 | |
|         6. Final conversion to int16 stereo format
 | |
|         """
 | |
|         pts, time_base = await self.next_timestamp()
 | |
| 
 | |
|         # --- 1. TONE GENERATION: Create base frequency tone based on ball Y position ---
 | |
|         # Frequency mapping: Top of screen = high pitch (400Hz), bottom = low pitch (200Hz)
 | |
|         if self.video_track:
 | |
|             base_freq = self._get_ball_frequency()  # 200-400Hz range
 | |
|         else:
 | |
|             base_freq = 440.0  # default A4 if no video track
 | |
| 
 | |
|         # Generate sine wave at calculated frequency
 | |
|         t = (np.arange(self.samples_per_frame) + pts) / self.sample_rate
 | |
|         base_samples = np.sin(2 * np.pi * base_freq * t).astype(np.float32)
 | |
| 
 | |
|         # --- 2. BOUNCE EFFECTS: Generate separate bounce sound effects (centered audio) ---
 | |
|         # Bounce effects are generated independently to avoid being affected by panning
 | |
|         bounce_samples = np.zeros(self.samples_per_frame, dtype=np.float32)
 | |
|         current_time_s = self.clock.now()
 | |
|         current_sample = int(current_time_s * self.sample_rate)
 | |
| 
 | |
|         for bounce in self._active_bounces:
 | |
|             if bounce["start_sample"] <= current_sample < bounce["end_sample"]:
 | |
|                 # Calculate relative time within this specific bounce event
 | |
|                 sample_offset = current_sample - bounce["start_sample"]
 | |
|                 bounce_t = sample_offset / self.sample_rate
 | |
| 
 | |
|                 # Generate bounce waveform: 600Hz tone with exponential decay envelope
 | |
|                 tb = np.arange(self.samples_per_frame) / self.sample_rate + bounce_t
 | |
|                 bounce_freq = 600.0  # Hz (higher than base tone for clarity)
 | |
|                 bounce_env = np.exp(
 | |
|                     -tb * 20.0
 | |
|                 )  # Fast exponential decay (20.0 = decay rate)
 | |
|                 bounce_wave = (
 | |
|                     0.8 * np.sin(2 * np.pi * bounce_freq * tb) * bounce_env
 | |
|                 )  # 0.8 = bounce amplitude (80% of full scale)
 | |
| 
 | |
|                 # Limit bounce duration to prevent runaway effects
 | |
|                 valid_samples = tb < 0.2  # 200ms maximum bounce duration
 | |
|                 bounce_wave[~valid_samples] = 0
 | |
| 
 | |
|                 # Accumulate bounce effects (multiple bounces can overlap)
 | |
|                 bounce_samples = bounce_samples + bounce_wave.astype(np.float32)
 | |
| 
 | |
|         # Clean up expired bounce events to prevent memory accumulation
 | |
|         self._active_bounces = [
 | |
|             bounce
 | |
|             for bounce in self._active_bounces
 | |
|             if bounce["end_sample"] > current_sample
 | |
|         ]
 | |
| 
 | |
|         # --- 3. STEREO PANNING: Apply left/right positioning to base tone only ---
 | |
|         # Pan calculation: 0.0 = full left, 0.5 = center, 1.0 = full right
 | |
|         if self.video_track:
 | |
|             pan = (
 | |
|                 self.video_track.ball["x"] / self.video_track.width
 | |
|             )  # Normalize to 0-1
 | |
|         else:
 | |
|             pan = 0.5  # Center positioning if no video track
 | |
| 
 | |
|         # Equal-power panning: maintains perceived loudness across stereo field
 | |
|         left_gain = math.cos(pan * math.pi / 2)  # Left channel gain (1.0 to 0.0)
 | |
|         right_gain = math.sin(pan * math.pi / 2)  # Right channel gain (0.0 to 1.0)
 | |
| 
 | |
|         # --- 4. VOLUME COMPENSATION: Apply Y-position based volume scaling ---
 | |
|         # Volume scaling compensates for perceptual frequency/amplitude relationship
 | |
|         if self.video_track:
 | |
|             # Scale volume from 50% (top) to 20% (bottom)
 | |
|             # Formula: Map normalized_y from [0,1] to volume range [0.4, 0.2]
 | |
|             normalized_y = self.video_track.ball["y"] / self.video_track.height
 | |
|             volume = 0.4 - (normalized_y * 0.3)  # 0.5 - (0 to 1) * 0.3 = 0.4 to 0.2
 | |
|         else:
 | |
|             volume = 0.35  # Mid-range volume (35%) if no video track
 | |
| 
 | |
|         # --- 5. AUDIO MIXING: Combine panned base tone with centered bounce effects ---
 | |
|         # Base tone: Apply stereo panning and volume compensation
 | |
|         left_base = base_samples * left_gain * volume
 | |
|         right_base = base_samples * right_gain * volume
 | |
| 
 | |
|         # Final mix: Add bounce effects equally to both channels (no panning)
 | |
|         # This keeps bounce effects prominent and centered regardless of ball position
 | |
|         left_total = left_base + bounce_samples
 | |
|         right_total = right_base + bounce_samples
 | |
| 
 | |
|         # --- 6. CLIPPING PREVENTION: Dynamic normalization with headroom management ---
 | |
|         # Check peak amplitude across both channels to detect potential clipping
 | |
|         max_left = np.max(np.abs(left_total))
 | |
|         max_right = np.max(np.abs(right_total))
 | |
|         max_amplitude = max(max_left, max_right)
 | |
| 
 | |
|         # HEADROOM: Maintain 5% safety margin (0.95 threshold) to prevent digital artifacts
 | |
|         if max_amplitude > 0.95:  # Threshold chosen to leave headroom for codec/DAC
 | |
|             # NORMALIZATION: Scale down entire signal to prevent clipping while preserving dynamics
 | |
|             normalization_factor = 0.95 / max_amplitude  # Proportional scaling
 | |
|             left_total *= normalization_factor
 | |
|             right_total *= normalization_factor
 | |
|             logger.debug(
 | |
|                 f"Audio normalization applied: peak={max_amplitude:.3f}, factor={normalization_factor:.3f}"
 | |
|             )
 | |
| 
 | |
|         # FINAL CONVERSION: Convert to int16 with hard clipping as ultimate safety net
 | |
|         # np.clip ensures values never exceed int16 range (-32768 to 32767)
 | |
|         left = np.clip(left_total * 32767, -32767, 32767).astype(np.int16)
 | |
|         right = np.clip(right_total * 32767, -32767, 32767).astype(np.int16)
 | |
| 
 | |
|         # --- 7. Interleave channels for s16 format (samples arranged as [L, R, L, R, ...]) ---
 | |
|         # Create interleaved array: [left[0], right[0], left[1], right[1], ...]
 | |
|         interleaved = np.empty(self.samples_per_frame * 2, dtype=np.int16)
 | |
|         interleaved[0::2] = left  # Even indices get left channel
 | |
|         interleaved[1::2] = right  # Odd indices get right channel
 | |
| 
 | |
|         # Reshape to (1, samples*2) as expected by s16 format
 | |
|         stereo = interleaved.reshape(1, -1)
 | |
| 
 | |
|         frame = AudioFrame.from_ndarray(stereo, format="s16", layout="stereo")
 | |
|         frame.sample_rate = self.sample_rate
 | |
|         frame.pts = pts
 | |
|         frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000)
 | |
|         self._samples_generated += self.samples_per_frame
 | |
|         return frame
 | |
| 
 | |
| 
 | |
| def create_synthetic_tracks(session_name: str) -> dict[str, MediaStreamTrack]:
 | |
|     """
 | |
|     Create synthetic audio and video tracks for WebRTC streaming.
 | |
| 
 | |
|     Args:
 | |
|         session_name: Name to use for generating video track colors
 | |
| 
 | |
|     Returns:
 | |
|         Dictionary containing 'video' and 'audio' tracks
 | |
| 
 | |
|     Note:
 | |
|         - To change ball speed, use: tracks["video"].set_ball_speed(speed_in_mps)
 | |
|           where speed_in_mps is meters per second (frame width = 1 meter)
 | |
|         - Audio generates continuous tone based on ball Y position (200-800Hz)
 | |
|         - Bounce events add additional audio on top of the continuous tone
 | |
|     """
 | |
|     media_clock = MediaClock()
 | |
| 
 | |
|     # Create video track first
 | |
|     video_track = AnimatedVideoTrack(name=session_name, clock=media_clock)
 | |
| 
 | |
|     # Create audio track with reference to video track for ball position-based frequency
 | |
|     audio_track = SyntheticAudioTrack(video_track=video_track, clock=media_clock)
 | |
| 
 | |
|     # Set the audio track reference on the video track for bounce events
 | |
|     video_track.audio_track = audio_track
 | |
| 
 | |
|     return {"video": video_track, "audio": audio_track}
 | |
| 
 | |
| 
 | |
| # Agent descriptor exported for dynamic discovery by the FastAPI service
 | |
| AGENT_NAME = "synthetic_media"
 | |
| AGENT_DESCRIPTION = "Synthetic audio and video tracks (AnimatedVideoTrack + SyntheticAudioTrack)"
 | |
| 
 | |
| def agent_info() -> dict[str, str]:
 | |
|     """Return agent metadata for discovery."""
 | |
|     return {"name": AGENT_NAME, "description": AGENT_DESCRIPTION}
 | |
| 
 | |
| def create_agent_tracks(session_name: str) -> dict[str, MediaStreamTrack]:
 | |
|     """Factory wrapper used by the FastAPI service to instantiate tracks for an agent."""
 | |
|     return create_synthetic_tracks(session_name)
 |