Custom Backend Example
Using your own backend API to handle AI requests with streaming
Streaming
Custom API
Backend
SSE
Overview
This example shows how to use @brander/sdk with your own backend API using streaming. This approach gives you full control over AI processing while providing progressive UI loading through Server-Sent Events (SSE).
Frontend Implementation
custom-backend-app.tsx
import Brander, { sseStream } from "@brander/sdk";
function CustomBackendApp() {
return (
<Brander
betaKey="bux_your_token"
projectId="custom-backend"
onQueryStream={(params) =>
sseStream("https://api.yourcompany.com/ai/stream", {
params,
headers: {
Authorization: `Bearer ${YOUR_API_KEY}`,
},
})
}
width="100%"
height="700px"
/>
);
}Backend Implementation (Streaming SSE)
Node.js / Express
backend-api.js
const express = require("express");
const Anthropic = require("@anthropic-ai/sdk");
const app = express();
app.use(express.json());
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
app.post("/ai/stream", async (req, res) => {
const { messages, tools, max_tokens } = req.body;
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
try {
// Stream from Anthropic
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: max_tokens || 4000,
messages,
tools: tools?.anthropic,
});
// Convert Anthropic events to AG-UI events
let currentToolId = null;
// Send RUN_STARTED
res.write(`data: ${JSON.stringify({
type: "RUN_STARTED",
runId: `run-${Date.now()}`,
timestamp: Date.now(),
})}\n\n`);
for await (const event of stream) {
if (event.type === "content_block_start") {
const block = event.content_block;
if (block.type === "tool_use") {
currentToolId = block.id;
res.write(`data: ${JSON.stringify({
type: "TOOL_CALL_START",
toolCallId: block.id,
toolCallName: block.name,
timestamp: Date.now(),
})}\n\n`);
}
} else if (event.type === "content_block_delta") {
const delta = event.delta;
if (delta.type === "input_json_delta" && currentToolId) {
res.write(`data: ${JSON.stringify({
type: "TOOL_CALL_ARGS",
toolCallId: currentToolId,
delta: delta.partial_json,
timestamp: Date.now(),
})}\n\n`);
} else if (delta.type === "text_delta") {
res.write(`data: ${JSON.stringify({
type: "TEXT_MESSAGE_CONTENT",
delta: delta.text,
timestamp: Date.now(),
})}\n\n`);
}
} else if (event.type === "content_block_stop" && currentToolId) {
res.write(`data: ${JSON.stringify({
type: "TOOL_CALL_END",
toolCallId: currentToolId,
timestamp: Date.now(),
})}\n\n`);
currentToolId = null;
}
}
// Send RUN_FINISHED
res.write(`data: ${JSON.stringify({
type: "RUN_FINISHED",
runId: `run-${Date.now()}`,
timestamp: Date.now(),
})}\n\n`);
res.end();
} catch (error) {
console.error("Stream error:", error);
res.write(`data: ${JSON.stringify({
type: "RUN_ERROR",
message: error.message,
timestamp: Date.now(),
})}\n\n`);
res.end();
}
});
app.listen(3000, () => {
console.log("Backend API running on port 3000");
});Python / FastAPI
backend_api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
import json
import time
app = FastAPI()
anthropic = Anthropic()
@app.post("/ai/stream")
async def stream_endpoint(request: dict):
messages = request.get("messages", [])
tools = request.get("tools", {}).get("anthropic", [])
max_tokens = request.get("max_tokens", 4000)
async def generate():
# Send RUN_STARTED
yield f"data: {json.dumps({'type': 'RUN_STARTED', 'runId': f'run-{int(time.time() * 1000)}', 'timestamp': int(time.time() * 1000)})}\n\n"
with anthropic.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=max_tokens,
messages=messages,
tools=tools,
) as stream:
current_tool_id = None
for event in stream:
if event.type == "content_block_start":
block = event.content_block
if block.type == "tool_use":
current_tool_id = block.id
yield f"data: {json.dumps({'type': 'TOOL_CALL_START', 'toolCallId': block.id, 'toolCallName': block.name, 'timestamp': int(time.time() * 1000)})}\n\n"
elif event.type == "content_block_delta":
delta = event.delta
if hasattr(delta, 'partial_json') and current_tool_id:
yield f"data: {json.dumps({'type': 'TOOL_CALL_ARGS', 'toolCallId': current_tool_id, 'delta': delta.partial_json, 'timestamp': int(time.time() * 1000)})}\n\n"
elif hasattr(delta, 'text'):
yield f"data: {json.dumps({'type': 'TEXT_MESSAGE_CONTENT', 'delta': delta.text, 'timestamp': int(time.time() * 1000)})}\n\n"
elif event.type == "content_block_stop" and current_tool_id:
yield f"data: {json.dumps({'type': 'TOOL_CALL_END', 'toolCallId': current_tool_id, 'timestamp': int(time.time() * 1000)})}\n\n"
current_tool_id = None
# Send RUN_FINISHED
yield f"data: {json.dumps({'type': 'RUN_FINISHED', 'runId': f'run-{int(time.time() * 1000)}', 'timestamp': int(time.time() * 1000)})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")Benefits of Custom Backend
- Security: Keep API keys secure on your server
- Streaming: Progressive UI loading with AG-UI events
- Rate Limiting: Control usage and costs
- Caching: Cache responses for better performance
- Data Processing: Transform data before sending to AI
- Logging: Track all AI interactions
- Custom Logic: Add business rules and validation
AG-UI Event Reference
Your backend should emit these SSE events:
RUN_STARTED— Emitted when stream beginsTOOL_CALL_START— When a tool call beginsTOOL_CALL_ARGS— Partial JSON arguments (streaming)TOOL_CALL_END— When tool call completesTEXT_MESSAGE_CONTENT— Text content chunksRUN_FINISHED— Emitted when stream endsRUN_ERROR— Emitted on errors