ai-voicebot/tests/test-webrtc-signaling.py

72 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""
Test WebRTC signaling handlers registration
"""
import asyncio
import json
import websockets
import sys
async def test_webrtc_handlers():
"""Test that WebRTC signaling handlers are properly registered"""
try:
# Connect to the WebSocket endpoint
uri = "ws://localhost:8000/ai-voicebot/ws"
async with websockets.connect(uri) as websocket:
print("Connected to WebSocket")
# Send a set_name message first
await websocket.send(json.dumps({
"type": "set_name",
"data": {"name": "test_user"}
}))
response = await websocket.recv()
print(f"Set name response: {response}")
# Test relayICECandidate handler
test_message = {
"type": "relayICECandidate",
"data": {
"peer_id": "nonexistent_peer",
"candidate": {"candidate": "test"}
}
}
await websocket.send(json.dumps(test_message))
print("Sent relayICECandidate message")
# Expect an error response since we're not in a lobby
response = await websocket.recv()
print(f"ICE candidate response: {response}")
# Test relaySessionDescription handler
test_message = {
"type": "relaySessionDescription",
"data": {
"peer_id": "nonexistent_peer",
"session_description": {"type": "offer"}
}
}
await websocket.send(json.dumps(test_message))
print("Sent relaySessionDescription message")
# Expect an error response since we're not in a lobby
response = await websocket.recv()
print(f"Session description response: {response}")
print("WebRTC signaling handlers are working!")
except Exception as e:
print(f"Error testing WebRTC handlers: {e}")
return False
return True
if __name__ == "__main__":
success = asyncio.run(test_webrtc_handlers())
sys.exit(0 if success else 1)