#!/usr/bin/env python3 """ Test WebRTC signaling handlers registration """ import asyncio import json import websockets import sys async def test_webrtc_handlers(): """Test that WebRTC signaling handlers are properly registered""" try: # Connect to the WebSocket endpoint uri = "ws://localhost:8000/ai-voicebot/ws" async with websockets.connect(uri) as websocket: print("Connected to WebSocket") # Send a set_name message first await websocket.send(json.dumps({ "type": "set_name", "data": {"name": "test_user"} })) response = await websocket.recv() print(f"Set name response: {response}") # Test relayICECandidate handler test_message = { "type": "relayICECandidate", "data": { "peer_id": "nonexistent_peer", "candidate": {"candidate": "test"} } } await websocket.send(json.dumps(test_message)) print("Sent relayICECandidate message") # Expect an error response since we're not in a lobby response = await websocket.recv() print(f"ICE candidate response: {response}") # Test relaySessionDescription handler test_message = { "type": "relaySessionDescription", "data": { "peer_id": "nonexistent_peer", "session_description": {"type": "offer"} } } await websocket.send(json.dumps(test_message)) print("Sent relaySessionDescription message") # Expect an error response since we're not in a lobby response = await websocket.recv() print(f"Session description response: {response}") print("WebRTC signaling handlers are working!") except Exception as e: print(f"Error testing WebRTC handlers: {e}") return False return True if __name__ == "__main__": success = asyncio.run(test_webrtc_handlers()) sys.exit(0 if success else 1)