Minimal Server Example
Implement a minimal relay server in Python with FastAPI.
FastAPI scaffolding and authentication
from __future__ import annotationsimport asyncio, os, uuidfrom fastapi import FastAPI, Request, WebSocket, WebSocketDisconnectfrom fastapi.responses import JSONResponseconnection = Nonepending: dict[str, asyncio.Future[dict]] = {}API_KEY = os.environ.get("API_KEY")if API_KEY is None:raise SystemExit("set API_KEY")app = FastAPI()
The server tracks the active WebSocket connection and a dictionary of pending request futures.
It reads a static API key from the API_KEY environment variable at startup for authenticating clients.
This authentication setup, based on a single static API key, is deliberately simple and meant for this demo only, or at most local testing. It is not secure for production use, and does not support multiple clients or dynamic provisioning. For production deployments, see Relay Server.
WebSocket /connect
The only endpoint exposed to the outside. A spectral-bridge client connects here with a Bearer token in the Authorization header.
@app.websocket("/connect")async def connect(ws: WebSocket):auth = ws.headers.get("authorization", "")if not auth.startswith("Bearer ") or auth.removeprefix("Bearer ").strip() != API_KEY:await ws.close(code=4001, reason="invalid api key")returnawait ws.accept()if connection is not None:try:await connection.close(code=1000, reason="replaced by newer connection")except Exception:passconnection = wsawait ws.send_json({"type": "connected"})while True:data = await ws.receive_json()if data.get("type") == "response":request_id = data.get("request_id")future = pending.get(request_id)if future and not future.done():future.set_result(data.get("payload", {}))
If the key is missing or invalid, the server closes the connection with code 4001.
On success, it sends a {"type": "connected"} frame and begins listening for response frames from the client.
POST /v1/chat/completions
Your platform's internal services call this endpoint to send requests to a connected client. It should not be publicly reachable.
@app.post("/v1/chat/completions")async def forward(request: Request):body = await request.json()request_id = str(uuid.uuid4())future = asyncio.get_running_loop().create_future()pending[request_id] = futureawait connection.send_json({"type": "request","request_id": request_id,"payload": {"method": "POST","path": "/v1/chat/completions","headers": {"content-type": "application/json"},"body": body,},})try:result = await asyncio.wait_for(future, timeout=30.0)except asyncio.TimeoutError:return JSONResponse(status_code=504,content={"error": {"message": "client response timed out"}})finally:pending.pop(request_id, None)return JSONResponse(status_code=result.get("status", 200),content=result.get("body", {}))
The server wraps the request body in a request frame, assigns it a unique request_id, and pushes it down the WebSocket.
It awaits a matching response frame from the client (30-second timeout) and returns the result as a regular HTTP response.
Returns 504 if the client doesn't respond in time.