Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions examples/realtime_agent/realtime_agent.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Building an Interruptible Customer Support Agent with OpenAI Realtime API\n",
"\n",
"This notebook demonstrates how to build a voice-based customer support agent using the **OpenAI Realtime API (Beta)**.\n",
"\n",
"## Key Features Demonstrated\n",
"1. **WebSocket Connection**: Persistent, low-latency stateful connection.\n",
"2. **Tool Use**: The agent can look up mock data (Order Status).\n",
"3. **Interruption Handling**: The client handles `input_audio_buffer.speech_started` to cancel the AI's response when the user interrupts.\n",
"\n",
"## Prerequisites\n",
"- OpenAI API Key with access to `gpt-4o-realtime-preview`.\n",
"- Python 3.10+\n",
"- `websockets`, `asyncio`, `numpy`\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import json\n",
"import logging\n",
"import websockets\n",
"import traceback\n",
"\n",
"# Configuring logging to see the events live\n",
"logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n",
"logger = logging.getLogger(\"RealtimeAgent\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Define Tools\n",
"\n",
"We define a simple tool `get_order_status` to look up orders in a mock database."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Mock Data\n",
"MOCK_DB = {\n",
" \"order_123\": {\"status\": \"shipped\", \"delivery_date\": \"2024-12-20\"},\n",
" \"order_456\": {\"status\": \"processing\", \"delivery_date\": \"2024-12-25\"},\n",
"}\n",
"\n",
"def get_order_status(order_id: str):\n",
" \"\"\"\n",
" Look up an order by ID.\n",
" \"\"\"\n",
" print(f\"\\n[Tool] Looking up order {order_id}...\")\n",
" result = MOCK_DB.get(order_id, {\"status\": \"not_found\"})\n",
" return json.dumps(result)\n",
"\n",
"tools_schema = [\n",
" {\n",
" \"type\": \"function\",\n",
" \"name\": \"get_order_status\",\n",
" \"description\": \"Get the status of a customer order\",\n",
" \"parameters\": {\n",
" \"type\": \"object\",\n",
" \"properties\": {\n",
" \"order_id\": {\n",
" \"type\": \"string\",\n",
" \"description\": \"The order ID, e.g. order_123\"\n",
" }\n",
" },\n",
" \"required\": [\"order_id\"]\n",
" }\n",
" }\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. The Realtime Agent Class\n",
"\n",
"This class handles the WebSocket connection, event parsing, and audio streaming."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class RealtimeAgent:\n",
" def __init__(self, url=\"wss://api.openai.com/v1/realtime\", api_key=None, model=\"gpt-4o-realtime-preview-2024-10-01\"):\n",
" self.url = f\"{url}?model={model}\"\n",
" self.api_key = api_key\n",
" self.ws = None\n",
" self.should_stop = False\n",
"\n",
" async def connect(self):\n",
" headers = {\n",
" \"Authorization\": f\"Bearer {self.api_key}\",\n",
" \"OpenAI-Beta\": \"realtime=v1\"\n",
" }\n",
" try:\n",
" logger.info(f\"Connecting to {self.url}...\")\n",
" self.ws = await websockets.connect(self.url, additional_headers=headers)\n",
" logger.info(\"Connected!\")\n",
" await self.initialize_session()\n",
" except Exception as e:\n",
" logger.error(f\"Connection failed: {e}\")\n",
" raise\n",
"\n",
" async def initialize_session(self):\n",
" \"\"\"Send initial session configuration.\"\"\"\n",
" event = {\n",
" \"type\": \"session.update\",\n",
" \"session\": {\n",
" \"modalities\": [\"text\", \"audio\"],\n",
" \"instructions\": \"You are a helpful customer support agent. Check order status when asked.\",\n",
" \"voice\": \"alloy\",\n",
" \"turn_detection\": {\"type\": \"server_vad\"},\n",
" \"tools\": tools_schema,\n",
" \"tool_choice\": \"auto\",\n",
" }\n",
" }\n",
" await self.send_event(event)\n",
"\n",
" async def send_event(self, event):\n",
" if self.ws:\n",
" await self.ws.send(json.dumps(event))\n",
"\n",
" async def run_loop(self):\n",
" \"\"\"Main loop to receive messages.\"\"\"\n",
" try:\n",
" async for message in self.ws:\n",
" if self.should_stop: break\n",
" await self.handle_message(json.loads(message))\n",
" except Exception as e:\n",
" logger.error(f\"Loop error: {e}\")\n",
"\n",
" async def handle_message(self, data):\n",
" event_type = data.get(\"type\")\n",
" \n",
" if event_type == \"input_audio_buffer.speech_started\":\n",
" logger.warning(\"[INTERRUPTION] User started speaking! Canceling current response.\")\n",
" await self.send_event({\"type\": \"response.cancel\"})\n",
" \n",
" elif event_type == \"response.function_call_arguments.done\":\n",
" # Execute the tool\n",
" call_id = data.get(\"call_id\")\n",
" name = data.get(\"name\")\n",
" args = json.loads(data.get(\"arguments\"))\n",
" \n",
" if name == \"get_order_status\":\n",
" result = get_order_status(args.get(\"order_id\"))\n",
" # Send output back\n",
" await self.send_event({\n",
" \"type\": \"conversation.item.create\",\n",
" \"item\": {\n",
" \"type\": \"function_call_output\",\n",
" \"call_id\": call_id,\n",
" \"output\": result\n",
" }\n",
" })\n",
" # Trigger model to read the result\n",
" await self.send_event({\"type\": \"response.create\"})\n",
" \n",
" elif event_type == \"response.audio.delta\":\n",
" # Here you would play audio bytes\n",
" pass\n",
" \n",
" elif event_type == \"error\":\n",
" logger.error(f\"Error: {data.get('error')}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Running the Agent\n",
"\n",
"Replace `YOUR_API_KEY` below. This block runs the agent for 10 seconds to demonstrate the connection."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# REPLACE WITH YOUR KEY\n",
"API_KEY = \"YOUR_API_KEY_HERE\"\n",
"\n",
"async def main():\n",
" if \"YOUR_API_KEY\" in API_KEY:\n",
" print(\"Please set a valid API Key first!\")\n",
" return\n",
"\n",
" agent = RealtimeAgent(api_key=API_KEY)\n",
" await agent.connect()\n",
" \n",
" # Run the listener in background\n",
" listener_task = asyncio.create_task(agent.run_loop())\n",
" \n",
" try:\n",
" # Simulate a conversation starter if you like, or just wait for VAD\n",
" # For this demo, we just keep it alive for 10 seconds\n",
" print(\"Agent is listening (server VAD)... Speak now if you have a mic setup!\")\n",
" print(\"(Or manually trigger response.create if using mock)\")\n",
" await asyncio.sleep(10)\n",
" finally:\n",
" agent.should_stop = True\n",
" await agent.ws.close()\n",
" await listener_task\n",
"\n",
"# In Jupyter, we can await main directly\n",
"# await main()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}