Added smoke test

This commit is contained in:
James Ketr 2025-09-08 16:15:07 -07:00
parent ce012e253c
commit fd798c98aa

View File

@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Smoke test: two browser-like peers join a lobby with a bot and should receive addPeer messages.
This test is intentionally simple and fast. It connects two websocket clients to the
server WebSocket endpoint, sets distinct names, joins the same lobby, and asserts
that each client receives at least one `addPeer` message (indicating the server
attempted to establish WebRTC peers). The bot runs in a separate container; the
test only verifies that the server signaled peers to the connected clients.
"""
import asyncio
import json
import os
import sys
import ssl
import websockets
import httpx
BASE_WS = os.environ.get("AI_VOICEBOT_WS") or "wss://localhost:8000/ai-voicebot/ws/lobby"
def _ssl_ctx_insecure():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
async def simple_client(name, lobby_id, recv_queue):
# Create session via HTTP API in same container (server listens on 8000)
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get("https://localhost:8000/ai-voicebot/api/session")
resp.raise_for_status()
session_id = resp.json().get("id")
# Connect to lobby websocket path for this session
ws_uri = f"{BASE_WS}/{lobby_id}/{session_id}"
ssl_ctx = _ssl_ctx_insecure()
async with websockets.connect(ws_uri, ssl=ssl_ctx) as ws:
# set name
await ws.send(json.dumps({"type": "set_name", "data": {"name": name}}))
# consume update_name
try:
msg = await asyncio.wait_for(ws.recv(), timeout=2)
except Exception:
return
# set name
await ws.send(json.dumps({"type": "set_name", "data": {"name": name}}))
# consume update_name
try:
msg = await asyncio.wait_for(ws.recv(), timeout=2)
except Exception:
return
# join lobby
await ws.send(json.dumps({"type": "join", "data": {}}))
# read messages for a short window and forward addPeer events
end_at = asyncio.get_event_loop().time() + 4
while asyncio.get_event_loop().time() < end_at:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1)
except asyncio.TimeoutError:
continue
try:
parsed = json.loads(msg)
except Exception:
continue
if parsed.get("type") == "addPeer":
await recv_queue.put(parsed)
async def run_test():
# Create two sessions and a lobby using the API
async with httpx.AsyncClient(verify=False) as client:
r1 = await client.get("https://localhost:8000/ai-voicebot/api/session")
r1.raise_for_status()
s1 = r1.json().get("id")
r2 = await client.get("https://localhost:8000/ai-voicebot/api/session")
r2.raise_for_status()
s2 = r2.json().get("id")
# Create lobby using session 1
lobby_req = {"type": "lobby_create", "data": {"name": "smoke-lobby", "private": False}}
r3 = await client.post(f"https://localhost:8000/ai-voicebot/api/lobby/{s1}", json=lobby_req)
r3.raise_for_status()
lobby_id = r3.json().get("data", {}).get("id")
recv_q1 = asyncio.Queue()
recv_q2 = asyncio.Queue()
# start two websocket clients tied to the created sessions
client1 = asyncio.create_task(simple_client("smoke-1", lobby_id, recv_q1))
await asyncio.sleep(0.2)
client2 = asyncio.create_task(simple_client("smoke-2", lobby_id, recv_q2))
await asyncio.wait_for(asyncio.gather(client1, client2), timeout=20)
got1 = False
got2 = False
try:
while True:
_ = recv_q1.get_nowait()
got1 = True
except Exception:
pass
try:
while True:
_ = recv_q2.get_nowait()
got2 = True
except Exception:
pass
print(f"Client1 got addPeer={got1}, Client2 got addPeer={got2}")
return got1 and got2
if __name__ == "__main__":
ok = False
try:
ok = asyncio.run(run_test())
except Exception as e:
print(f"Test failed: {e}")
sys.exit(1)
sys.exit(0 if ok else 2)