#!/usr/bin/env python3 """ Smoke test: two browser-like peers join a lobby with a bot and should receive addPeer messages. This test is intentionally simple and fast. It connects two websocket clients to the server WebSocket endpoint, sets distinct names, joins the same lobby, and asserts that each client receives at least one `addPeer` message (indicating the server attempted to establish WebRTC peers). The bot runs in a separate container; the test only verifies that the server signaled peers to the connected clients. """ import asyncio import json import os import sys import ssl import websockets import httpx BASE_WS = os.environ.get("AI_VOICEBOT_WS") or "wss://localhost:8000/ai-voicebot/ws/lobby" def _ssl_ctx_insecure(): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx async def simple_client(name, lobby_id, recv_queue): # Create session via HTTP API in same container (server listens on 8000) async with httpx.AsyncClient(verify=False) as client: resp = await client.get("https://localhost:8000/ai-voicebot/api/session") resp.raise_for_status() session_id = resp.json().get("id") # Connect to lobby websocket path for this session ws_uri = f"{BASE_WS}/{lobby_id}/{session_id}" ssl_ctx = _ssl_ctx_insecure() async with websockets.connect(ws_uri, ssl=ssl_ctx) as ws: # set name await ws.send(json.dumps({"type": "set_name", "data": {"name": name}})) # consume update_name try: msg = await asyncio.wait_for(ws.recv(), timeout=2) except Exception: return # set name await ws.send(json.dumps({"type": "set_name", "data": {"name": name}})) # consume update_name try: msg = await asyncio.wait_for(ws.recv(), timeout=2) except Exception: return # join lobby await ws.send(json.dumps({"type": "join", "data": {}})) # read messages for a short window and forward addPeer events end_at = asyncio.get_event_loop().time() + 4 while asyncio.get_event_loop().time() < end_at: try: msg = await asyncio.wait_for(ws.recv(), timeout=1) except asyncio.TimeoutError: continue try: parsed = json.loads(msg) except Exception: continue if parsed.get("type") == "addPeer": await recv_queue.put(parsed) async def run_test(): # Create two sessions and a lobby using the API async with httpx.AsyncClient(verify=False) as client: r1 = await client.get("https://localhost:8000/ai-voicebot/api/session") r1.raise_for_status() s1 = r1.json().get("id") r2 = await client.get("https://localhost:8000/ai-voicebot/api/session") r2.raise_for_status() s2 = r2.json().get("id") # Create lobby using session 1 lobby_req = {"type": "lobby_create", "data": {"name": "smoke-lobby", "private": False}} r3 = await client.post(f"https://localhost:8000/ai-voicebot/api/lobby/{s1}", json=lobby_req) r3.raise_for_status() lobby_id = r3.json().get("data", {}).get("id") recv_q1 = asyncio.Queue() recv_q2 = asyncio.Queue() # start two websocket clients tied to the created sessions client1 = asyncio.create_task(simple_client("smoke-1", lobby_id, recv_q1)) await asyncio.sleep(0.2) client2 = asyncio.create_task(simple_client("smoke-2", lobby_id, recv_q2)) await asyncio.wait_for(asyncio.gather(client1, client2), timeout=20) got1 = False got2 = False try: while True: _ = recv_q1.get_nowait() got1 = True except Exception: pass try: while True: _ = recv_q2.get_nowait() got2 = True except Exception: pass print(f"Client1 got addPeer={got1}, Client2 got addPeer={got2}") return got1 and got2 if __name__ == "__main__": ok = False try: ok = asyncio.run(run_test()) except Exception as e: print(f"Test failed: {e}") sys.exit(1) sys.exit(0 if ok else 2)