Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 217 additions & 9 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ ServeHTTP(w, r)
- `WithAutoRouterInterceptor(i)` — Add interceptor to chain
- `WithAutoRouterHTTPClient(c)` — Custom HTTP client
- `WithAutoRouterFallbackProvider(p)` — Provider when detection fails
- `WithAutoRouterWebSocket(upgrader, dialer)` — Enable WebSocket mode (opt-in, see [WebSocket Mode](#websocket-mode))
- `WithAutoRouterWSBillingCallback(cb)` — Per-turn billing callback for WebSocket connections

**Example:**

Expand Down Expand Up @@ -462,13 +464,24 @@ All built-in providers implement this interface for streaming support.

### Usage Extraction

**OpenAI**: Usage is sent in the final chunk before `[DONE]`:
**OpenAI Chat Completions**: Usage is sent in the final chunk before `[DONE]`:

```json
data: {"id":"...","usage":{"prompt_tokens":100,"completion_tokens":50,"total_tokens":150}}
data: [DONE]
```

**OpenAI Responses API**: Usage is in the `response.completed` event. The `StreamingMultiAPIExtractor` automatically detects the API type from the request context and dispatches to the correct streaming extractor:

```json
data: {"type":"response.created","response":{"id":"resp_123","model":"gpt-4o"}}
data: {"type":"response.output_text.delta","delta":"Hello"}
data: {"type":"response.completed","response":{"usage":{"input_tokens":10,"output_tokens":5,"total_tokens":15}}}
data: [DONE]
```

No `stream_options.include_usage` is needed for the Responses API — usage is always included in `response.completed`. The proxy automatically skips `stream_options` injection for Responses API requests.

**Anthropic**: Usage is sent in `message_start` and `message_delta` events:

```json
Expand All @@ -480,7 +493,7 @@ data: {"type":"message_stop"}

### Auto stream_options Injection

When `BillingCalculator` is configured and the request has `stream: true`, the proxy automatically injects:
When `BillingCalculator` is configured and the request has `stream: true`, the proxy automatically injects `stream_options.include_usage` for **OpenAI-compatible Chat Completions** endpoints only:

```json
{
Expand All @@ -489,7 +502,13 @@ When `BillingCalculator` is configured and the request has `stream: true`, the p
}
```

This ensures OpenAI returns token usage in the streaming response for billing calculation.
This ensures providers return token usage in their streaming responses for billing calculation.

The following are **excluded** from this injection because they already include usage natively:

- **Responses API** — usage is always present in the `response.completed` event
- **Anthropic** — usage is sent in `message_start` and `message_delta` events
- **Bedrock** and **Google AI** — usage is included in their streaming event formats

### Efficient Flushing

Expand Down Expand Up @@ -526,6 +545,189 @@ After the stream completes, the billing callback is invoked with the extracted t

---

## WebSocket Mode

The proxy supports the OpenAI Responses API WebSocket mode for persistent, multi-turn connections. This is useful for tool-call-heavy workflows where multiple `response.create` / `response.completed` cycles happen on a single connection.

### Adapter Pattern (Zero Dependencies)

The library defines abstract WebSocket interfaces — **no WebSocket library is vendored**. Consumers bring their own implementation (gorilla/websocket, nhooyr.io/websocket, etc.) and wire it in via thin adapters.

#### Interfaces

```go
// WSConn abstracts a WebSocket connection.
// gorilla/websocket's *Conn satisfies this directly.
type WSConn interface {
ReadMessage() (messageType int, p []byte, err error)
WriteMessage(messageType int, data []byte) error
Close() error
}

// WSUpgrader upgrades an HTTP request to a WebSocket connection.
type WSUpgrader interface {
Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WSConn, error)
}

// WSDialer dials a WebSocket connection to an upstream server.
type WSDialer interface {
DialContext(ctx context.Context, urlStr string, requestHeader http.Header) (WSConn, *http.Response, error)
}

// WebSocketCapableProvider is implemented by providers that support WebSocket mode.
type WebSocketCapableProvider interface {
Provider
WebSocketURL(meta BodyMetadata) (*url.URL, error)
}
```

The OpenAI provider implements `WebSocketCapableProvider`. Other providers can opt in by implementing the same interface.

#### gorilla/websocket Example

gorilla's `*websocket.Conn` already satisfies `WSConn` — no wrapper needed. You only need thin adapters for `Upgrader` and `Dialer` because their return types differ (`*websocket.Conn` vs `WSConn`):

```go
package myadapter

import (
"context"
"net/http"

"github.com/agentuity/llmproxy"
"github.com/gorilla/websocket"
)

// GorillaUpgrader wraps gorilla's Upgrader to satisfy llmproxy.WSUpgrader.
type GorillaUpgrader struct {
Upgrader websocket.Upgrader
}

func (u *GorillaUpgrader) Upgrade(w http.ResponseWriter, r *http.Request, h http.Header) (llmproxy.WSConn, error) {
conn, err := u.Upgrader.Upgrade(w, r, h)
if err != nil {
return nil, err
}
return conn, nil // *websocket.Conn satisfies WSConn
}

// GorillaDialer wraps gorilla's Dialer to satisfy llmproxy.WSDialer.
type GorillaDialer struct {
Dialer websocket.Dialer
}

func (d *GorillaDialer) DialContext(ctx context.Context, urlStr string, h http.Header) (llmproxy.WSConn, *http.Response, error) {
conn, resp, err := d.Dialer.DialContext(ctx, urlStr, h)
if err != nil {
return nil, resp, err
}
return conn, resp, nil // *websocket.Conn satisfies WSConn
}
```

#### Wiring It Together

```go
router := llmproxy.NewAutoRouter(
llmproxy.WithAutoRouterFallbackProvider(openaiProvider),
// In production, configure CheckOrigin to validate against trusted origins.
llmproxy.WithAutoRouterWebSocket(
&myadapter.GorillaUpgrader{
Upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Validate r.Header.Get("Origin") against a whitelist
return isAllowedOrigin(r)
},
},
},
&myadapter.GorillaDialer{
Dialer: websocket.Dialer{},
},
),
llmproxy.WithAutoRouterWSBillingCallback(func(turn int, meta llmproxy.ResponseMetadata, billing *llmproxy.BillingResult) {
log.Printf("Turn %d: model=%s prompt=%d completion=%d",
turn, meta.Model, meta.Usage.PromptTokens, meta.Usage.CompletionTokens)
if billing != nil {
log.Printf(" Cost: $%.6f", billing.TotalCost)
}
}),
)
router.RegisterProvider(openaiProvider)

http.Handle("/", router)
```

WebSocket support is **opt-in** — if `WithAutoRouterWebSocket` is not called, WebSocket upgrade requests are rejected and normal HTTP handling is unchanged.

### WebSocket Protocol

The OpenAI Responses API WebSocket mode operates at `wss://api.openai.com/v1/responses`:

| Aspect | Detail |
|--------|--------|
| **Client sends** | `{"type":"response.create","model":"gpt-4o","input":[...]}` |
| **Server sends** | Same events as SSE: `response.created`, `response.output_text.delta`, `response.completed`, etc. |
| **Multi-turn** | Client sends new `response.create` with `previous_response_id` on same connection |
| **Usage** | In `response.completed` → `response.usage.{input_tokens, output_tokens, total_tokens}` |
| **Frames** | JSON text frames — no `data:` prefix (unlike SSE) |

### WebSocket Flow

```text
+------------------+ +------------------+ +------------------+
| Client | | AutoRouter | | Upstream |
| | | | | (OpenAI) |
+--------+---------+ +--------+---------+ +--------+---------+
| | |
1. WS Upgrade --------> 2. Accept upgrade |
| 3. Read first message |
| (response.create) |
| 4. Detect provider/model |
| 5. Strip model prefix |
| 6. Enrich headers (auth) |
| 7. Dial upstream WS --------> 8. Accept
| 9. Forward first msg --------> |
| | |
| === Bidirectional Relay === |
| | |
10. response.create -----> Strip prefix ------------> response.create
| | |
| | <----------- response.created
response.created <------ | |
| | <----------- response.output_text.delta
text delta <---------------- | |
| | <----------- response.completed
response.completed <---- Extract usage, (includes usage)
| compute billing |
| | |
11. response.create -----> Strip prefix ------------> (next turn)
| ... ...
| | |
12. Close ----------------> Close both ------------> Close
```

### Billing

Billing is calculated **per turn** — each `response.completed` event triggers:

1. Usage extraction (`input_tokens`, `output_tokens`, `total_tokens`, cache details)
2. Cost calculation via `BillingCalculator` (if configured)
3. `WSBillingCallback` invocation with the turn number, response metadata, and billing result

For multi-turn connections, each turn is billed independently. The callback receives an incrementing turn counter so consumers can aggregate if needed.

### Model Prefix Stripping

Works the same as HTTP mode — `openai/gpt-4o` is stripped to `gpt-4o` in all `response.create` messages. Non-`response.create` messages are forwarded byte-for-byte without modification.

### Connection Lifecycle

- Both relay goroutines use a `sync.Once`-guarded close — when either side closes, the other is closed immediately
- WebSocket close errors (`io.EOF`, "connection closed") are treated as normal termination, not errors
- The `ForwardWebSocket` method blocks until both relay goroutines complete

---

## Providers

Nine providers are included. Six share the OpenAI-compatible base; three have fully custom implementations.
Expand Down Expand Up @@ -554,14 +756,16 @@ The OpenAI provider also supports the **Responses API** (`/v1/responses`) with a

**OpenAI** — Wraps `openai_compatible` with support for multiple APIs:
- **Chat Completions** (`/v1/chat/completions`) — Standard messages-based API
- **Responses** (`/v1/responses`) — Newer API with `input` field, built-in tools support
- **Responses** (`/v1/responses`) — Newer API with `input` field, built-in tools support. Supports both HTTP (SSE streaming) and WebSocket modes.
- **Legacy Completions** (`/v1/completions`) — Older prompt-based API

The provider auto-detects the API type from the request body:
- `input` field → Responses API
- `prompt` field → Completions API
- `messages` field → Chat Completions API

The OpenAI provider implements `WebSocketCapableProvider`, enabling persistent WebSocket connections for multi-turn Responses API workflows when `WithAutoRouterWebSocket` is configured.

**Anthropic** — Custom body parser translates between the proxy's canonical format and Anthropic's Messages API. Custom extractor maps Anthropic's response shape (content blocks, stop_reason) back to `ResponseMetadata`. Auth uses the `x-api-key` header alongside an `anthropic-version` header.

**Groq, Fireworks, x.AI** — Each wraps `openai_compatible` with its own base URL and provider name. No custom parsing or extraction logic needed.
Expand Down Expand Up @@ -1017,6 +1221,7 @@ Matches the signature of `github.com/agentuity/go-common/logger` without requiri
llmproxy/
├── apitype.go # API type detection and constants
├── autorouter.go # AutoRouter, provider/API auto-detection, streaming
├── autorouter_websocket.go # WebSocket forwarding, bidirectional relay
├── billing.go # CostInfo, CostLookup, BillingResult, CalculateCost
├── billing_calculator.go # BillingCalculator for streaming/non-streaming
├── detection.go # Provider detection from model/header
Expand All @@ -1034,6 +1239,7 @@ llmproxy/
├── registry.go # Registry interface, MapRegistry
├── resolver.go # URLResolver interface
├── streaming.go # SSE parser, streaming types, usage extraction
├── websocket.go # WebSocket interfaces, message parsing, usage extraction
├── interceptors/
│ ├── addheader.go # AddHeaderInterceptor
│ ├── billing.go # BillingInterceptor
Expand All @@ -1056,12 +1262,14 @@ llmproxy/
│ ├── googleai/ # Google AI Gemini
│ │ └── streaming_extractor.go # Google AI streaming
│ ├── groq/ # Groq (OpenAI-compatible)
│ ├── openai/ # OpenAI (Chat Completions + Responses)
│ ├── openai/ # OpenAI (Chat Completions + Responses + WebSocket)
│ ├── openai_compatible/ # Base for OpenAI-compatible providers
│ │ ├── multiapi.go # Multi-API parser/extractor
│ │ ├── streaming_extractor.go # SSE streaming with usage extraction
│ │ ├── responses_parser.go # Responses API parser
│ │ └── responses_extractor.go # Responses API extractor
│ │ ├── multiapi.go # Multi-API parser/extractor dispatch
│ │ ├── streaming_extractor.go # Chat Completions SSE streaming
│ │ ├── responses_parser.go # Responses API parser
│ │ ├── responses_extractor.go # Responses API extractor
│ │ ├── responses_streaming_extractor.go # Responses API SSE streaming
│ │ └── websocket.go # WebSocket URL resolver
│ └── xai/ # x.AI (OpenAI-compatible)
└── examples/
└── basic/ # Multi-provider proxy server example (uses AutoRouter)
Expand Down
Loading
Loading